You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:32 UTC
[34/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-186]
Implement the OpenMessaging specification 0.1.0-alpha version
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
new file mode 100644
index 0000000..84b6c2d
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.rocketmq.promise;
+
+public enum FutureState {
+ /**
+ * the task is doing
+ **/
+ DOING(0),
+ /**
+ * the task is done
+ **/
+ DONE(1),
+ /**
+ * ths task is cancelled
+ **/
+ CANCELLED(2);
+
+ public final int value;
+
+ private FutureState(int value) {
+ this.value = value;
+ }
+
+ public boolean isCancelledState() {
+ return this == CANCELLED;
+ }
+
+ public boolean isDoneState() {
+ return this == DONE;
+ }
+
+ public boolean isDoingState() {
+ return this == DOING;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
new file mode 100644
index 0000000..104d3d9
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.slf4j.Logger;
+
+public final class BeanUtils {
+ final static Logger log = ClientLogger.getLog();
+
+ /**
+ * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
+ */
+ private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<Class<?>, Class<?>>();
+
+ static {
+ primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
+ primitiveWrapperMap.put(Byte.TYPE, Byte.class);
+ primitiveWrapperMap.put(Character.TYPE, Character.class);
+ primitiveWrapperMap.put(Short.TYPE, Short.class);
+ primitiveWrapperMap.put(Integer.TYPE, Integer.class);
+ primitiveWrapperMap.put(Long.TYPE, Long.class);
+ primitiveWrapperMap.put(Double.TYPE, Double.class);
+ primitiveWrapperMap.put(Float.TYPE, Float.class);
+ primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
+ }
+
+ private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<Class<?>, Class<?>>();
+
+ static {
+ for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
+ final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass);
+ if (!primitiveClass.equals(wrapperClass)) {
+ wrapperMap.put(wrapperClass, primitiveClass);
+ }
+ }
+ wrapperMap.put(String.class, String.class);
+ }
+
+ /**
+ * <p>Populate the JavaBeans properties of the specified bean, based on
+ * the specified name/value pairs. This method uses Java reflection APIs
+ * to identify corresponding "property setter" method names, and deals
+ * with setter arguments of type <Code>String</Code>, <Code>boolean</Code>,
+ * <Code>int</Code>, <Code>long</Code>, <Code>float</Code>, and
+ * <Code>double</Code>.</p>
+ *
+ * <p>The particular setter method to be called for each property is
+ * determined using the usual JavaBeans introspection mechanisms. Thus,
+ * you may identify custom setter methods using a BeanInfo class that is
+ * associated with the class of the bean itself. If no such BeanInfo
+ * class is available, the standard method name conversion ("set" plus
+ * the capitalized name of the property in question) is used.</p>
+ *
+ * <p><strong>NOTE</strong>: It is contrary to the JavaBeans Specification
+ * to have more than one setter method (with different argument
+ * signatures) for the same property.</p>
+ *
+ * @param clazz JavaBean class whose properties are being populated
+ * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set
+ * @param <T> Class type
+ * @return Class instance
+ */
+ public static <T> T populate(final Properties properties, final Class<T> clazz) {
+ T obj = null;
+ try {
+ obj = clazz.newInstance();
+ return populate(properties, obj);
+ } catch (Throwable e) {
+ log.warn("Error occurs !", e);
+ }
+ return obj;
+ }
+
+ public static <T> T populate(final KeyValue properties, final Class<T> clazz) {
+ T obj = null;
+ try {
+ obj = clazz.newInstance();
+ return populate(properties, obj);
+ } catch (Throwable e) {
+ log.warn("Error occurs !", e);
+ }
+ return obj;
+ }
+
+ public static Class<?> getMethodClass(Class<?> clazz, String methodName) {
+ Method[] methods = clazz.getMethods();
+ for (Method method : methods) {
+ if (method.getName().equalsIgnoreCase(methodName)) {
+ return method.getParameterTypes()[0];
+ }
+ }
+ return null;
+ }
+
+ public static void setProperties(Class<?> clazz, Object obj, String methodName,
+ Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Class<?> parameterClass = getMethodClass(clazz, methodName);
+ Method setterMethod = clazz.getMethod(methodName, parameterClass);
+ if (parameterClass == Boolean.TYPE) {
+ setterMethod.invoke(obj, Boolean.valueOf(value.toString()));
+ } else if (parameterClass == Integer.TYPE) {
+ setterMethod.invoke(obj, Integer.valueOf(value.toString()));
+ } else if (parameterClass == Double.TYPE) {
+ setterMethod.invoke(obj, Double.valueOf(value.toString()));
+ } else if (parameterClass == Float.TYPE) {
+ setterMethod.invoke(obj, Float.valueOf(value.toString()));
+ } else if (parameterClass == Long.TYPE) {
+ setterMethod.invoke(obj, Long.valueOf(value.toString()));
+ } else
+ setterMethod.invoke(obj, value);
+ }
+
+ public static <T> T populate(final Properties properties, final T obj) {
+ Class<?> clazz = obj.getClass();
+ try {
+
+ Set<Map.Entry<Object, Object>> entries = properties.entrySet();
+ for (Map.Entry<Object, Object> entry : entries) {
+ String entryKey = entry.getKey().toString();
+ String[] keyGroup = entryKey.split("\\.");
+ for (int i = 0; i < keyGroup.length; i++) {
+ keyGroup[i] = keyGroup[i].toLowerCase();
+ keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
+ }
+ String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
+ try {
+ setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue());
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
+ //ignored...
+ }
+ }
+ } catch (RuntimeException e) {
+ log.warn("Error occurs !", e);
+ }
+ return obj;
+ }
+
+ public static <T> T populate(final KeyValue properties, final T obj) {
+ Class<?> clazz = obj.getClass();
+ try {
+
+ final Set<String> keySet = properties.keySet();
+ for (String key : keySet) {
+ String[] keyGroup = key.split("\\.");
+ for (int i = 0; i < keyGroup.length; i++) {
+ keyGroup[i] = keyGroup[i].toLowerCase();
+ keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
+ }
+ String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
+ try {
+ setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key));
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
+ //ignored...
+ }
+ }
+ } catch (RuntimeException e) {
+ log.warn("Error occurs !", e);
+ }
+ return obj;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
new file mode 100644
index 0000000..60c8408
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.OMS;
+import io.openmessaging.SendResult;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.domain.SendResultImpl;
+import java.lang.reflect.Field;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+
+public class OMSUtil {
+
+ /**
+ * Builds a OMS client instance name.
+ *
+ * @return a unique instance name
+ */
+ public static String buildInstanceName() {
+ return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
+ }
+
+ public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
+ org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
+ rmqMessage.setBody(omsMessage.getBody());
+
+ KeyValue headers = omsMessage.headers();
+ KeyValue properties = omsMessage.properties();
+
+ //All destinations in RocketMQ use Topic
+ if (headers.containsKey(MessageHeader.TOPIC)) {
+ rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
+ rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+ } else {
+ rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
+ rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
+ }
+
+ for (String key : properties.keySet()) {
+ MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
+ }
+
+ //Headers has a high priority
+ for (String key : headers.keySet()) {
+ MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
+ }
+
+ return rmqMessage;
+ }
+
+ public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
+ BytesMessage omsMsg = new BytesMessageImpl();
+ omsMsg.setBody(rmqMsg.getBody());
+
+ KeyValue headers = omsMsg.headers();
+ KeyValue properties = omsMsg.properties();
+
+ final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
+
+ for (final Map.Entry<String, String> entry : entries) {
+ if (isOMSHeader(entry.getKey())) {
+ headers.put(entry.getKey(), entry.getValue());
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
+ if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
+ rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
+ omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
+ } else {
+ omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
+ }
+ omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
+ omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
+ omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
+ omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
+ omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
+ return omsMsg;
+ }
+
+ public static boolean isOMSHeader(String value) {
+ for (Field field : MessageHeader.class.getDeclaredFields()) {
+ try {
+ if (field.get(MessageHeader.class).equals(value)) {
+ return true;
+ }
+ } catch (IllegalAccessException e) {
+ return false;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
+ */
+ public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
+ assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK);
+ return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue());
+ }
+
+ public static KeyValue buildKeyValue(KeyValue... keyValues) {
+ KeyValue keyValue = OMS.newKeyValue();
+ for (KeyValue properties : keyValues) {
+ for (String key : properties.keySet()) {
+ keyValue.put(key, properties.getString(key));
+ }
+ }
+ return keyValue;
+ }
+
+ /**
+ * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
+ */
+ public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
+ return new Iterator<T>() {
+ Iterator<T> iterator = new Iterator<T>() {
+ @Override
+ public synchronized boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public synchronized T next() {
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public synchronized void remove() {
+ //Ignore
+ }
+ };
+
+ @Override
+ public synchronized boolean hasNext() {
+ return iterator.hasNext() || iterable.iterator().hasNext();
+ }
+
+ @Override
+ public synchronized T next() {
+ if (!iterator.hasNext()) {
+ iterator = iterable.iterator();
+ if (!iterator.hasNext()) {
+ throw new NoSuchElementException();
+ }
+ }
+ return iterator.next();
+ }
+
+ @Override
+ public synchronized void remove() {
+ iterator.remove();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
new file mode 100644
index 0000000..ae4d3ed
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LocalMessageCacheTest {
+ private LocalMessageCache localMessageCache;
+ @Mock
+ private DefaultMQPullConsumer rocketmqPullConsume;
+ @Mock
+ private ConsumeRequest consumeRequest;
+
+ @Before
+ public void init() {
+ ClientConfig clientConfig = new ClientConfig();
+ clientConfig.setRmqPullMessageBatchNums(512);
+ clientConfig.setRmqPullMessageCacheCapacity(1024);
+ localMessageCache = new LocalMessageCache(rocketmqPullConsume, clientConfig);
+ }
+
+ @Test
+ public void testNextPullBatchNums() throws Exception {
+ assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(512);
+ for (int i = 0; i < 513; i++) {
+ localMessageCache.submitConsumeRequest(consumeRequest);
+ }
+ assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(511);
+ }
+
+ @Test
+ public void testNextPullOffset() throws Exception {
+ MessageQueue messageQueue = new MessageQueue();
+ when(rocketmqPullConsume.fetchConsumeOffset(any(MessageQueue.class), anyBoolean()))
+ .thenReturn(123L);
+ assertThat(localMessageCache.nextPullOffset(new MessageQueue())).isEqualTo(123L);
+ }
+
+ @Test
+ public void testUpdatePullOffset() throws Exception {
+ MessageQueue messageQueue = new MessageQueue();
+ localMessageCache.updatePullOffset(messageQueue, 124L);
+ assertThat(localMessageCache.nextPullOffset(messageQueue)).isEqualTo(124L);
+ }
+
+ @Test
+ public void testSubmitConsumeRequest() throws Exception {
+ byte [] body = new byte[]{'1', '2', '3'};
+ MessageExt consumedMsg = new MessageExt();
+ consumedMsg.setMsgId("NewMsgId");
+ consumedMsg.setBody(body);
+ consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+ consumedMsg.setTopic("HELLO_QUEUE");
+
+ when(consumeRequest.getMessageExt()).thenReturn(consumedMsg);
+ localMessageCache.submitConsumeRequest(consumeRequest);
+ assertThat(localMessageCache.poll()).isEqualTo(consumedMsg);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
new file mode 100644
index 0000000..277a5c6
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullConsumerImplTest {
+ private PullConsumer consumer;
+ private String queueName = "HELLO_QUEUE";
+
+ @Mock
+ private DefaultMQPullConsumer rocketmqPullConsumer;
+ private LocalMessageCache localMessageCache =
+ spy(new LocalMessageCache(rocketmqPullConsumer, new ClientConfig()));
+
+ @Before
+ public void init() throws NoSuchFieldException, IllegalAccessException {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ consumer = messagingAccessPoint.createPullConsumer(queueName,
+ OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+
+ Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
+ field.setAccessible(true);
+ field.set(consumer, rocketmqPullConsumer); //Replace
+
+ field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
+ field.setAccessible(true);
+ field.set(consumer, localMessageCache);
+
+ messagingAccessPoint.startup();
+ consumer.startup();
+ }
+
+ @Test
+ public void testPoll() {
+ final byte[] testBody = new byte[] {'a', 'b'};
+ MessageExt consumedMsg = new MessageExt();
+ consumedMsg.setMsgId("NewMsgId");
+ consumedMsg.setBody(testBody);
+ consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+ consumedMsg.setTopic(queueName);
+
+ when(localMessageCache.poll()).thenReturn(consumedMsg);
+
+ Message message = consumer.poll();
+ assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+ assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
+ }
+
+ @Test
+ public void testPoll_WithTimeout() {
+ //There is a default timeout value, @see ClientConfig#omsOperationTimeout.
+ Message message = consumer.poll();
+ assertThat(message).isNull();
+
+ message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
+ assertThat(message).isNull();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
new file mode 100644
index 0000000..882e57e
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessageListener;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PushConsumerImplTest {
+ private PushConsumer consumer;
+
+ @Mock
+ private DefaultMQPushConsumer rocketmqPushConsumer;
+
+ @Before
+ public void init() throws NoSuchFieldException, IllegalAccessException {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ consumer = messagingAccessPoint.createPushConsumer(
+ OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+
+ Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
+ field.setAccessible(true);
+ DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer);
+ field.set(consumer, rocketmqPushConsumer); //Replace
+
+ when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
+ messagingAccessPoint.startup();
+ consumer.startup();
+ }
+
+ @Test
+ public void testConsumeMessage() {
+ final byte[] testBody = new byte[] {'a', 'b'};
+
+ MessageExt consumedMsg = new MessageExt();
+ consumedMsg.setMsgId("NewMsgId");
+ consumedMsg.setBody(testBody);
+ consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+ consumedMsg.setTopic("HELLO_QUEUE");
+ consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
+ @Override
+ public void onMessage(final Message message, final ReceivedMessageContext context) {
+ assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+ assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
+ context.ack();
+ }
+ });
+ ((MessageListenerConcurrently) rocketmqPushConsumer
+ .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
new file mode 100644
index 0000000..1db80c3
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.Producer;
+import io.openmessaging.exception.OMSRuntimeException;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProducerImplTest {
+ private Producer producer;
+
+ @Mock
+ private DefaultMQProducer rocketmqProducer;
+
+ @Before
+ public void init() throws NoSuchFieldException, IllegalAccessException {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ producer = messagingAccessPoint.createProducer();
+
+ Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
+ field.setAccessible(true);
+ field.set(producer, rocketmqProducer);
+
+ messagingAccessPoint.startup();
+ producer.startup();
+ }
+
+ @Test
+ public void testSend_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ SendResult sendResult = new SendResult();
+ sendResult.setMsgId("TestMsgID");
+ sendResult.setSendStatus(SendStatus.SEND_OK);
+ when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
+ io.openmessaging.SendResult omsResult =
+ producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+
+ assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
+ }
+
+ @Test
+ public void testSend_Not_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ SendResult sendResult = new SendResult();
+ sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT);
+
+ when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
+ try {
+ producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+ failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+ } catch (Exception e) {
+ assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
+ }
+ }
+
+ @Test
+ public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
+ try {
+ producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+ failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+ } catch (Exception e) {
+ assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
new file mode 100644
index 0000000..823fe01
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.SequenceProducer;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SequenceProducerImplTest {
+
+ private SequenceProducer producer;
+
+ @Mock
+ private DefaultMQProducer rocketmqProducer;
+
+ @Before
+ public void init() throws NoSuchFieldException, IllegalAccessException {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ producer = messagingAccessPoint.createSequenceProducer();
+
+ Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
+ field.setAccessible(true);
+ field.set(producer, rocketmqProducer);
+
+ messagingAccessPoint.startup();
+ producer.startup();
+ }
+
+ @Test
+ public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ SendResult sendResult = new SendResult();
+ sendResult.setMsgId("TestMsgID");
+ sendResult.setSendStatus(SendStatus.SEND_OK);
+ when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
+ when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
+ final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
+ producer.send(message);
+ producer.commit();
+ assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
+ }
+
+ @Test
+ public void testRollback() {
+ when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
+ final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
+ producer.send(message);
+ producer.rollback();
+ producer.commit(); //Commit nothing.
+ assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
new file mode 100644
index 0000000..2240ff2
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.promise;
+
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.exception.OMSRuntimeException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
+public class DefaultPromiseTest {
+ private Promise<String> promise;
+
+ @Before
+ public void init() {
+ promise = new DefaultPromise<>();
+ }
+
+ @Test
+ public void testIsCancelled() throws Exception {
+ assertThat(promise.isCancelled()).isEqualTo(false);
+ }
+
+ @Test
+ public void testIsDone() throws Exception {
+ assertThat(promise.isDone()).isEqualTo(false);
+ promise.set("Done");
+ assertThat(promise.isDone()).isEqualTo(true);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ promise.set("Done");
+ assertThat(promise.get()).isEqualTo("Done");
+ }
+
+ @Test
+ public void testGet_WithTimeout() throws Exception {
+ try {
+ promise.get(100);
+ failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+ } catch (OMSRuntimeException e) {
+ assertThat(e).hasMessageContaining("Get request result is timeout or interrupted");
+ }
+ }
+
+ @Test
+ public void testAddListener() throws Exception {
+ promise.addListener(new PromiseListener<String>() {
+ @Override
+ public void operationCompleted(final Promise<String> promise) {
+ assertThat(promise.get()).isEqualTo("Done");
+ }
+
+ @Override
+ public void operationFailed(final Promise<String> promise) {
+
+ }
+ });
+ promise.set("Done");
+ }
+
+ @Test
+ public void testAddListener_ListenerAfterSet() throws Exception {
+ promise.set("Done");
+ promise.addListener(new PromiseListener<String>() {
+ @Override
+ public void operationCompleted(final Promise<String> promise) {
+ assertThat(promise.get()).isEqualTo("Done");
+ }
+
+ @Override
+ public void operationFailed(final Promise<String> promise) {
+
+ }
+ });
+ }
+
+ @Test
+ public void testAddListener_WithException_ListenerAfterSet() throws Exception {
+ final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+ promise.setFailure(exception);
+ promise.addListener(new PromiseListener<String>() {
+ @Override
+ public void operationCompleted(final Promise<String> promise) {
+ }
+
+ @Override
+ public void operationFailed(final Promise<String> promise) {
+ assertThat(promise.getThrowable()).isEqualTo(exception);
+ }
+ });
+ }
+
+ @Test
+ public void testAddListener_WithException() throws Exception {
+ final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+ promise.addListener(new PromiseListener<String>() {
+ @Override
+ public void operationCompleted(final Promise<String> promise) {
+ }
+
+ @Override
+ public void operationFailed(final Promise<String> promise) {
+ assertThat(promise.getThrowable()).isEqualTo(exception);
+ }
+ });
+ promise.setFailure(exception);
+ }
+
+ @Test
+ public void getThrowable() throws Exception {
+ assertThat(promise.getThrowable()).isNull();
+ Throwable exception = new OMSRuntimeException("-1", "Test Error");
+ promise.setFailure(exception);
+ assertThat(promise.getThrowable()).isEqualTo(exception);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
new file mode 100644
index 0000000..71ca11c
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.OMS;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BeanUtilsTest {
+ private KeyValue properties = OMS.newKeyValue();
+
+ public static class CustomizedConfig extends ClientConfig {
+ final static String STRING_TEST = "string.test";
+ String stringTest = "foobar";
+
+ final static String DOUBLE_TEST = "double.test";
+ double doubleTest = 123.0;
+
+ final static String LONG_TEST = "long.test";
+ long longTest = 123L;
+
+ String getStringTest() {
+ return stringTest;
+ }
+
+ public void setStringTest(String stringTest) {
+ this.stringTest = stringTest;
+ }
+
+ double getDoubleTest() {
+ return doubleTest;
+ }
+
+ public void setDoubleTest(final double doubleTest) {
+ this.doubleTest = doubleTest;
+ }
+
+ long getLongTest() {
+ return longTest;
+ }
+
+ public void setLongTest(final long longTest) {
+ this.longTest = longTest;
+ }
+
+ CustomizedConfig() {
+ }
+ }
+
+ @Before
+ public void init() {
+ properties.put(NonStandardKeys.MAX_REDELIVERY_TIMES, 120);
+ properties.put(CustomizedConfig.STRING_TEST, "kaka");
+ properties.put(NonStandardKeys.CONSUMER_GROUP, "Default_Consumer_Group");
+ properties.put(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT, 101);
+
+ properties.put(CustomizedConfig.LONG_TEST, 1234567890L);
+ properties.put(CustomizedConfig.DOUBLE_TEST, 10.234);
+ }
+
+ @Test
+ public void testPopulate() {
+ CustomizedConfig config = BeanUtils.populate(properties, CustomizedConfig.class);
+
+ //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+ Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
+ Assert.assertEquals(config.getStringTest(), "kaka");
+ Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
+ Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
+ Assert.assertEquals(config.getLongTest(), 1234567890L);
+ Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
+ }
+
+ @Test
+ public void testPopulate_ExistObj() {
+ CustomizedConfig config = new CustomizedConfig();
+ config.setOmsConsumerId("NewConsumerId");
+
+ Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId");
+
+ config = BeanUtils.populate(properties, config);
+
+ //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+ Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
+ Assert.assertEquals(config.getStringTest(), "kaka");
+ Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
+ Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
+ Assert.assertEquals(config.getLongTest(), 1234567890L);
+ Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 05ead63..25e4c84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -181,6 +181,7 @@
<module>filter</module>
<module>test</module>
<module>distribution</module>
+ <module>openmessaging</module>
</modules>
<build>
@@ -617,6 +618,11 @@
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ <version>0.1.0-alpha</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>