You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/06/20 02:06:09 UTC
[rocketmq] branch feature_oms_1.0.0 updated: [ISSUE #1199]
Implement the 1.0.0 openmessaging new consumer API for rocketmq oms module
(#1240)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch feature_oms_1.0.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/feature_oms_1.0.0 by this push:
new 246e34e [ISSUE #1199] Implement the 1.0.0 openmessaging new consumer API for rocketmq oms module (#1240)
246e34e is described below
commit 246e34eb01cdf46db20dccea9f04dc4fae279715
Author: zhoubo <87...@qq.com>
AuthorDate: Thu Jun 20 10:06:04 2019 +0800
[ISSUE #1199] Implement the 1.0.0 openmessaging new consumer API for rocketmq oms module (#1240)
* Adapt to the new consumer api
* New consumer api implements code optimization
* Adjust consumer example
* 1、Optimize consumer code implementation
2、Fix bug
* 1、Optimize consumer code implementation
2、Fix bug
* Add new api unit test
* Rename OMSUtil to OMSClientUtil
* Unit test adds more verification
---
.../example/openmessaging/SimplePullConsumer.java | 39 +++--
.../example/openmessaging/SimplePushConsumer.java | 25 +--
.../rocketmq/MessagingAccessPointImpl.java | 93 +++--------
.../rocketmq/config/DefaultQueueMetaData.java | 65 --------
.../rocketmq/consumer/LocalMessageCache.java | 62 +++----
.../rocketmq/consumer/PullConsumerImpl.java | 164 ++++++-------------
.../rocketmq/consumer/PushConsumerImpl.java | 179 +++++++--------------
.../rocketmq/domain/BytesMessageImpl.java | 24 ++-
.../rocketmq/domain/DefaultMessageFactory.java | 29 ++++
.../rocketmq/domain/DefaultMessageReceipt.java | 66 ++++++++
.../rocketmq/domain/DefaultQueueMetaData.java | 72 +++++++++
.../rocketmq/domain/MessageExtension.java | 34 ++++
.../rocketmq/domain/MessageExtensionHeader.java | 146 +++++++++++++++++
.../rocketmq/domain/MessageHeader.java | 13 ++
.../rocketmq/domain/NonStandardKeys.java | 4 +-
.../rocketmq/producer/AbstractOMSProducer.java | 2 +-
.../rocketmq/producer/ProducerImpl.java | 38 +++--
.../utils/{OMSUtil.java => OMSClientUtil.java} | 28 +++-
.../rocketmq/consumer/LocalMessageCacheTest.java | 90 +++++++++++
.../rocketmq/consumer/PullConsumerImplTest.java | 155 ++++++++++++++++--
.../rocketmq/consumer/PushConsumerImplTest.java | 101 ++++++++++--
21 files changed, 949 insertions(+), 480 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
index 86aba41..7d82dc8 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -16,40 +16,44 @@
*/
package org.apache.rocketmq.example.openmessaging;
-import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
-import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.message.Message;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
+import io.openmessaging.rocketmq.domain.DefaultMessageReceipt;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.util.HashSet;
+import java.util.Set;
public class SimplePullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
- messagingAccessPoint.startup();
-
final Producer producer = messagingAccessPoint.createProducer();
final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
- OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
+ OMS.newKeyValue().put(NonStandardKeys.CONSUMER_ID, "OMS_CONSUMER"));
- messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
- final String queueName = "TopicTest";
-
- producer.startup();
- Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes());
+ final String queueName = "OMS_HELLO_TOPIC";
+ producer.start();
+ Message msg = producer.createMessage(queueName, "Hello Open Messaging".getBytes());
SendResult sendResult = producer.send(msg);
System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId());
- producer.shutdown();
+ producer.stop();
- consumer.attachQueue(queueName);
+ Set<String> queueNames = new HashSet<String>(8) {
+ {
+ add(queueName);
+ }
+ };
+ consumer.bindQueue(queueNames);
- consumer.startup();
+ consumer.start();
System.out.printf("Consumer startup OK%n");
// Keep running until we find the one that has just been sent
@@ -57,9 +61,11 @@ public class SimplePullConsumer {
while (!stop) {
Message message = consumer.receive();
if (message != null) {
- String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
+ String msgId = message.header().getMessageId();
System.out.printf("Received one message: %s%n", msgId);
- consumer.ack(msgId);
+ DefaultMessageReceipt defaultMessageReceipt = new DefaultMessageReceipt();
+ defaultMessageReceipt.setMessageId(msgId);
+ consumer.ack(defaultMessageReceipt);
if (!stop) {
stop = msgId.equalsIgnoreCase(sendResult.messageId());
@@ -70,7 +76,6 @@ public class SimplePullConsumer {
}
}
- consumer.shutdown();
- messagingAccessPoint.shutdown();
+ consumer.stop();
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
index 220c132..7ac905a 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -16,12 +16,14 @@
*/
package org.apache.rocketmq.example.openmessaging;
-import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
-import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;
+import io.openmessaging.message.Message;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.util.HashSet;
+import java.util.Set;
public class SimplePushConsumer {
public static void main(String[] args) {
@@ -29,28 +31,29 @@ public class SimplePushConsumer {
.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final PushConsumer consumer = messagingAccessPoint.
- createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
-
- messagingAccessPoint.startup();
- System.out.printf("MessagingAccessPoint startup OK%n");
+ createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_ID, "OMS_CONSUMER"));
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- consumer.shutdown();
- messagingAccessPoint.shutdown();
+ consumer.stop();
}
}));
- consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
+ Set<String> queueNames = new HashSet<String>(8) {
+ {
+ add("OMS_HELLO_TOPIC");
+ }
+ };
+ consumer.bindQueue(queueNames, new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
- System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
+ System.out.printf("Received one message: %s%n", message.header().getMessageId());
context.ack();
}
});
- consumer.startup();
+ consumer.start();
System.out.printf("Consumer startup OK%n");
}
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index f045a41..715adb4 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -18,24 +18,26 @@ package io.openmessaging.rocketmq;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.consumer.Consumer;
+import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.MessageFactory;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.TransactionStateCheckListener;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.domain.DefaultMessageFactory;
import io.openmessaging.rocketmq.producer.ProducerImpl;
-import java.util.HashSet;
-import java.util.Set;
public class MessagingAccessPointImpl implements MessagingAccessPoint {
private final KeyValue accessPointProperties;
+ private final MessageFactory messageFactory;
+
public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
this.accessPointProperties = accessPointProperties;
+ this.messageFactory = new DefaultMessageFactory();
}
@Override
@@ -57,77 +59,34 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
return null;
}
- @Override public Consumer createConsumer() {
- String consumerId = accessPointProperties.getString(NonStandardKeys.CONSUMER_ID);
- String[] nsStrArr = consumerId.split("_");
- if (nsStrArr.length < 2) {
- return new PushConsumerImpl(accessPointProperties);
- }
- if (NonStandardKeys.PULL_CONSUMER.equals(nsStrArr[0])) {
- return new PullConsumerImpl(accessPointProperties);
- }
+ @Override public PushConsumer createPushConsumer() {
return new PushConsumerImpl(accessPointProperties);
}
- @Override
- public ResourceManager resourceManager() {
- DefaultResourceManager resourceManager = new DefaultResourceManager();
- return resourceManager;
- }
-
- @Override public MessageFactory messageFactory() {
- return null;
+ @Override public PullConsumer createPullConsumer() {
+ return new PullConsumerImpl(accessPointProperties);
}
- class DefaultResourceManager implements ResourceManager {
-
- @Override
- public void createNamespace(String nsName) {
- accessPointProperties.put(NonStandardKeys.CONSUMER_ID, nsName);
- }
-
- @Override
- public void deleteNamespace(String nsName) {
- accessPointProperties.put(NonStandardKeys.CONSUMER_ID, null);
- }
-
- @Override
- public void switchNamespace(String targetNamespace) {
- accessPointProperties.put(NonStandardKeys.CONSUMER_ID, targetNamespace);
+ @Override public PushConsumer createPushConsumer(KeyValue attributes) {
+ for (String key : attributes.keySet()) {
+ accessPointProperties.put(key, attributes.getString(key));
}
+ return new PushConsumerImpl(accessPointProperties);
+ }
- @Override
- public Set<String> listNamespaces() {
- return new HashSet<String>() {
- {
- add(accessPointProperties.getString(NonStandardKeys.CONSUMER_ID));
- }
- };
- }
-
- @Override
- public void createQueue(String queueName) {
-
- }
-
- @Override
- public void deleteQueue(String queueName) {
-
- }
-
- @Override
- public Set<String> listQueues(String nsName) {
- return null;
- }
-
- @Override
- public void filter(String queueName, String filterString) {
-
+ @Override public PullConsumer createPullConsumer(KeyValue attributes) {
+ for (String key : attributes.keySet()) {
+ accessPointProperties.put(key, attributes.getString(key));
}
+ return new PullConsumerImpl(accessPointProperties);
+ }
- @Override
- public void routing(String sourceQueue, String targetQueue) {
+ @Override
+ public ResourceManager resourceManager() {
+ return null;
+ }
- }
- };
+ @Override public MessageFactory messageFactory() {
+ return messageFactory;
+ }
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java
deleted file mode 100644
index b2695bf..0000000
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.openmessaging.rocketmq.config;
-
-import io.openmessaging.extension.QueueMetaData;
-
-import java.util.List;
-
-public class DefaultQueueMetaData implements QueueMetaData {
-
- private String queueName;
-
- private List<QueueMetaData.Partition> partitions;
-
- public DefaultQueueMetaData(String queueName, List<QueueMetaData.Partition> partitions) {
- this.queueName = queueName;
- this.partitions = partitions;
- }
-
- @Override
- public String queueName() {
- return queueName;
- }
-
- @Override
- public List<QueueMetaData.Partition> partitions() {
- return partitions;
- }
-
- public static class DefaultPartition implements Partition {
-
- public DefaultPartition(int partitionId, String partitonHost) {
- this.partitionId = partitionId;
- this.partitonHost = partitonHost;
- }
-
- private int partitionId;
-
- private String partitonHost;
-
- @Override
- public int partitionId() {
- return partitionId;
- }
-
- @Override
- public String partitonHost() {
- return partitonHost;
- }
- }
-}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
index c0f498b..1838983 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -21,9 +21,9 @@ import io.openmessaging.ServiceLifeState;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.rocketmq.config.ClientConfig;
-import io.openmessaging.rocketmq.config.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.utils.OMSClientUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -36,6 +36,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -67,7 +68,7 @@ class LocalMessageCache implements ServiceLifecycle {
this.rocketmqPullConsumer = rocketmqPullConsumer;
this.clientConfig = clientConfig;
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "OMS_CleanExpireMsgScheduledThread_"));
+ "OMS_CleanExpireMsgScheduledThread_"));
this.currentState = ServiceLifeState.INITIALIZED;
}
@@ -79,7 +80,7 @@ class LocalMessageCache implements ServiceLifecycle {
if (!pullOffsetTable.containsKey(remoteQueue)) {
try {
pullOffsetTable.putIfAbsent(remoteQueue,
- rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
+ rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
} catch (MQClientException e) {
log.error("A error occurred in fetch consume offset process.", e);
}
@@ -125,19 +126,28 @@ class LocalMessageCache implements ServiceLifecycle {
return null;
}
- List<MessageExt> batchPoll(final KeyValue properties) {
- List<ConsumeRequest> consumeRequests = new ArrayList<>(16);
- int n = consumeRequestCache.drainTo(consumeRequests);
- if (n > 0) {
- List<MessageExt> messageExts = new ArrayList<>(n);
- for (ConsumeRequest consumeRequest : consumeRequests) {
- MessageExt messageExt = consumeRequest.getMessageExt();
- consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
- MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
- consumedRequest.put(messageExt.getMsgId(), consumeRequest);
- messageExts.add(messageExt);
+ List<MessageExt> batchPoll(KeyValue properties) {
+ List<ConsumeRequest> consumeRequests = new ArrayList<>(clientConfig.getRmqPullMessageBatchNums());
+ long timeout = properties.getLong(NonStandardKeys.TIMEOUT);
+ while (timeout >= 0) {
+ int n = consumeRequestCache.drainTo(consumeRequests, clientConfig.getRmqPullMessageBatchNums());
+ if (n > 0) {
+ List<MessageExt> messageExts = new ArrayList<>(n);
+ for (ConsumeRequest consumeRequest : consumeRequests) {
+ MessageExt messageExt = consumeRequest.getMessageExt();
+ consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+ MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
+ consumedRequest.put(messageExt.getMsgId(), consumeRequest);
+ messageExts.add(messageExt);
+ }
+ return messageExts;
+ }
+ if (timeout > 0) {
+ LockSupport.parkNanos(timeout * 1000 * 1000);
+ timeout = 0;
+ } else {
+ timeout = -1;
}
- return messageExts;
}
return null;
}
@@ -166,7 +176,7 @@ class LocalMessageCache implements ServiceLifecycle {
private void cleanExpireMsg() {
for (final Map.Entry<MessageQueue, ProcessQueue> next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl()
- .getRebalanceImpl().getProcessQueueTable().entrySet()) {
+ .getRebalanceImpl().getProcessQueueTable().entrySet()) {
ProcessQueue pq = next.getValue();
MessageQueue mq = next.getKey();
ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
@@ -186,7 +196,7 @@ class LocalMessageCache implements ServiceLifecycle {
if (!msgTreeMap.isEmpty()) {
msg = msgTreeMap.firstEntry().getValue();
if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
- > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
+ > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
//Expired, ack and remove it.
} else {
break;
@@ -204,7 +214,7 @@ class LocalMessageCache implements ServiceLifecycle {
try {
rocketmqPullConsumer.sendMessageBack(msg, 3);
log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
- msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+ msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
ack(mq, pq, msg);
} catch (Exception e) {
log.error("Send back expired msg exception", e);
@@ -237,7 +247,7 @@ class LocalMessageCache implements ServiceLifecycle {
public void stop() {
this.currentState = ServiceLifeState.STOPPING;
ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS);
- this.currentState = ServiceLifeState.STARTED;
+ this.currentState = ServiceLifeState.STOPPED;
}
@Override
@@ -246,7 +256,7 @@ class LocalMessageCache implements ServiceLifecycle {
}
@Override
- public QueueMetaData getQueueMetaData(String queueName) {
+ public Set<QueueMetaData> getQueueMetaData(String queueName) {
Set<MessageQueue> messageQueues;
try {
messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName);
@@ -254,16 +264,6 @@ class LocalMessageCache implements ServiceLifecycle {
log.error("A error occurred when get queue metadata.", e);
return null;
}
- List<QueueMetaData.Partition> partitions = new ArrayList<>(16);
- if (null != messageQueues && !messageQueues.isEmpty()) {
- for (MessageQueue messageQueue : messageQueues) {
- QueueMetaData.Partition partition = new DefaultQueueMetaData.DefaultPartition(messageQueue.getQueueId(), messageQueue.getBrokerName());
- partitions.add(partition);
- }
- } else {
- return null;
- }
- QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, partitions);
- return queueMetaData;
+ return OMSClientUtil.queueMetaDataConvert(messageQueues);
}
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index 03ff901..f4efa92 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -18,10 +18,8 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.ServiceLifeState;
-import io.openmessaging.consumer.BatchMessageListener;
-import io.openmessaging.consumer.Consumer;
-import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.MessageReceipt;
+import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
@@ -31,10 +29,13 @@ import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.domain.DefaultMessageReceipt;
+import io.openmessaging.rocketmq.domain.MessageExtension;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
-import io.openmessaging.rocketmq.utils.OMSUtil;
+import io.openmessaging.rocketmq.utils.OMSClientUtil;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -55,10 +56,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
-public class PullConsumerImpl implements Consumer {
-
- private static final int PULL_MAX_NUMS = 32;
- private static final int PULL_MIN_NUMS = 1;
+public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final KeyValue properties;
@@ -67,7 +65,8 @@ public class PullConsumerImpl implements Consumer {
private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
private ServiceLifeState currentState;
- private List<ConsumerInterceptor> consumerInterceptors;
+ private final List<ConsumerInterceptor> consumerInterceptors;
+ private final Extension extension;
private final static InternalLogger log = ClientLogger.getLog();
@@ -96,15 +95,15 @@ public class PullConsumerImpl implements Consumer {
int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes();
this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
- String consumerId = OMSUtil.buildInstanceName();
+ String consumerId = OMSClientUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put(NonStandardKeys.CONSUMER_ID, consumerId);
this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
-
consumerInterceptors = new ArrayList<>(16);
+ this.extension = new MessageExtension(this);
}
private void registerPullTaskCallback(final String targetQueueName) {
@@ -139,96 +138,36 @@ public class PullConsumerImpl implements Consumer {
});
}
- @Override
- public void resume() {
- currentState = ServiceLifeState.STARTED;
- }
-
- @Override
- public void suspend() {
- currentState = ServiceLifeState.STOPPED;
+ @Override public Set<String> getBindQueues() {
+ return rocketmqPullConsumer.getRegisterTopics();
}
@Override
- public void suspend(long timeout) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isSuspended() {
- if (ServiceLifeState.STOPPED.equals(currentState)) {
- return true;
- }
- return false;
+ public void addInterceptor(ConsumerInterceptor interceptor) {
+ consumerInterceptors.add(interceptor);
}
@Override
- public void bindQueue(String queueName) {
- registerPullTaskCallback(queueName);
+ public void removeInterceptor(ConsumerInterceptor interceptor) {
+ consumerInterceptors.remove(interceptor);
}
- @Override
- public void bindQueue(List<String> queueNames) {
+ @Override public void bindQueue(Collection<String> queueNames) {
for (String queueName : queueNames) {
- bindQueue(queueName);
+ registerPullTaskCallback(queueName);
}
}
- @Override
- public void bindQueue(String queueName, MessageListener listener) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void bindQueues(List<String> queueNames, MessageListener listener) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void bindQueue(String queueName, BatchMessageListener listener) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void bindQueues(List<String> queueNames, BatchMessageListener listener) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void unbindQueue(String queueName) {
- this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
- }
-
- @Override
- public void unbindQueues(List<String> queueNames) {
+ @Override public void unbindQueue(Collection<String> queueNames) {
for (String queueName : queueNames) {
this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
}
}
- @Override
- public boolean isBindQueue() {
- Set<String> registerTopics = rocketmqPullConsumer.getRegisterTopics();
- if (null == registerTopics || registerTopics.isEmpty()) {
- return false;
- }
- return true;
- }
-
- @Override
- public List<String> getBindQueues() {
- Set<String> registerTopics = rocketmqPullConsumer.getRegisterTopics();
- return new ArrayList<>(registerTopics);
- }
-
- @Override
- public void addInterceptor(ConsumerInterceptor interceptor) {
- consumerInterceptors.add(interceptor);
- }
-
- @Override
- public void removeInterceptor(ConsumerInterceptor interceptor) {
- consumerInterceptors.remove(interceptor);
+ @Override public Message receive() {
+ KeyValue properties = new DefaultKeyValue();
+ MessageExt rmqMsg = localMessageCache.poll(properties);
+ return rmqMsg == null ? null : OMSClientUtil.msgConvert(rmqMsg);
}
@Override
@@ -236,23 +175,24 @@ public class PullConsumerImpl implements Consumer {
KeyValue properties = new DefaultKeyValue();
properties.put(NonStandardKeys.TIMEOUT, timeout);
MessageExt rmqMsg = localMessageCache.poll(properties);
- return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
+ return rmqMsg == null ? null : OMSClientUtil.msgConvert(rmqMsg);
}
@Override
- public Message receive(String queueName, int partitionId, long receiptId, long timeout) {
- MessageQueue mq = null;
- mq = getQueue(queueName, partitionId, mq);
- PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MIN_NUMS);
- if (pullResult == null)
+ public Message receive(String queueName, QueueMetaData queueMetaData, MessageReceipt messageReceipt, long timeout) {
+ MessageQueue mq;
+ mq = getQueue(queueMetaData);
+ PullResult pullResult = getResult(((DefaultMessageReceipt) messageReceipt).getOffset(), timeout, mq, NonStandardKeys.PULL_MIN_NUMS);
+ if (pullResult == null) {
return null;
+ }
PullStatus pullStatus = pullResult.getPullStatus();
List<Message> messages = new ArrayList<>(16);
if (PullStatus.FOUND.equals(pullStatus)) {
List<MessageExt> rmqMsgs = pullResult.getMsgFoundList();
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
for (MessageExt messageExt : rmqMsgs) {
- BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
+ BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages.get(0);
@@ -261,10 +201,10 @@ public class PullConsumerImpl implements Consumer {
return null;
}
- private PullResult getResult(long receiptId, long timeout, MessageQueue mq, int nums) {
+ private PullResult getResult(long offset, long timeout, MessageQueue mq, int maxNums) {
PullResult pullResult;
try {
- pullResult = rocketmqPullConsumer.pull(mq, "*", receiptId, nums, timeout);
+ pullResult = rocketmqPullConsumer.pull(mq, "*", offset, maxNums, timeout);
} catch (MQClientException e) {
log.error("A error occurred when pull message.", e);
return null;
@@ -278,17 +218,15 @@ public class PullConsumerImpl implements Consumer {
log.error("A error occurred when pull message.", e);
return null;
}
- if (null == pullResult) {
- return null;
- }
return pullResult;
}
- private MessageQueue getQueue(String queueName, int partitionId, MessageQueue mq) {
+ private MessageQueue getQueue(QueueMetaData queueMetaData) {
+ MessageQueue mq = null;
try {
- Set<MessageQueue> messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName);
+ Set<MessageQueue> messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueMetaData.queueName());
for (MessageQueue messageQueue : messageQueues) {
- if (messageQueue.getQueueId() == partitionId) {
+ if (messageQueue.getQueueId() == queueMetaData.partitionId()) {
mq = messageQueue;
}
}
@@ -306,7 +244,7 @@ public class PullConsumerImpl implements Consumer {
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
List<Message> messages = new ArrayList<>(rmqMsgs.size());
for (MessageExt messageExt : rmqMsgs) {
- BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
+ BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages;
@@ -315,19 +253,21 @@ public class PullConsumerImpl implements Consumer {
}
@Override
- public List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout) {
- MessageQueue mq = null;
- mq = getQueue(queueName, partitionId, mq);
- PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MAX_NUMS);
- if (pullResult == null)
+ public List<Message> batchReceive(String queueName, QueueMetaData queueMetaData, MessageReceipt messageReceipt,
+ long timeout) {
+ MessageQueue mq;
+ mq = getQueue(queueMetaData);
+ PullResult pullResult = getResult(((DefaultMessageReceipt) messageReceipt).getOffset(), timeout, mq, clientConfig.getRmqPullMessageBatchNums());
+ if (pullResult == null) {
return null;
+ }
PullStatus pullStatus = pullResult.getPullStatus();
List<Message> messages = new ArrayList<>(16);
if (PullStatus.FOUND.equals(pullStatus)) {
List<MessageExt> rmqMsgs = pullResult.getMsgFoundList();
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
for (MessageExt messageExt : rmqMsgs) {
- BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
+ BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages;
@@ -338,18 +278,12 @@ public class PullConsumerImpl implements Consumer {
@Override
public void ack(MessageReceipt receipt) {
-
+ localMessageCache.ack(((DefaultMessageReceipt) receipt).getMessageId());
}
@Override
public Optional<Extension> getExtension() {
-
- return Optional.of(new Extension() {
- @Override
- public QueueMetaData getQueueMetaData(String queueName) {
- return getQueueMetaData(queueName);
- }
- });
+ return Optional.of(extension);
}
@Override
@@ -381,7 +315,7 @@ public class PullConsumerImpl implements Consumer {
}
@Override
- public QueueMetaData getQueueMetaData(String queueName) {
+ public Set<QueueMetaData> getQueueMetaData(String queueName) {
return localMessageCache.getQueueMetaData(queueName);
}
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index 8d55b57..4576b28 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -20,27 +20,30 @@ import io.openmessaging.KeyValue;
import io.openmessaging.OMS;
import io.openmessaging.ServiceLifeState;
import io.openmessaging.consumer.BatchMessageListener;
-import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.MessageReceipt;
+import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.interceptor.ConsumerInterceptor;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig;
-import io.openmessaging.rocketmq.config.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.MessageExtension;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
-import io.openmessaging.rocketmq.utils.OMSUtil;
+import io.openmessaging.rocketmq.utils.OMSClientUtil;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -48,12 +51,13 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
-public class PushConsumerImpl implements Consumer {
+public class PushConsumerImpl implements PushConsumer {
private final static InternalLogger log = ClientLogger.getLog();
@@ -65,6 +69,8 @@ public class PushConsumerImpl implements Consumer {
private final ClientConfig clientConfig;
private ServiceLifeState currentState;
private List<ConsumerInterceptor> consumerInterceptors;
+ private ScheduledExecutorService scheduledExecutorService;
+ private final Extension extension;
public PushConsumerImpl(final KeyValue properties) {
this.rocketmqPushConsumer = new DefaultMQPushConsumer();
@@ -89,7 +95,7 @@ public class PushConsumerImpl implements Consumer {
this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
- String consumerId = OMSUtil.buildInstanceName();
+ String consumerId = OMSClientUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId);
properties.put(NonStandardKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
@@ -97,6 +103,9 @@ public class PushConsumerImpl implements Consumer {
this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
consumerInterceptors = new ArrayList<>(16);
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
+ "OMS_SuspendTimeouThread_"));
+ extension = new MessageExtension(this);
currentState = ServiceLifeState.INITIALIZED;
}
@@ -112,7 +121,12 @@ public class PushConsumerImpl implements Consumer {
@Override
public void suspend(long timeout) {
- throw new UnsupportedOperationException();
+ this.rocketmqPushConsumer.suspend();
+ scheduledExecutorService.schedule(new Runnable() {
+ @Override public void run() {
+ PushConsumerImpl.this.rocketmqPushConsumer.resume();
+ }
+ }, timeout, TimeUnit.MILLISECONDS);
}
@Override
@@ -120,90 +134,49 @@ public class PushConsumerImpl implements Consumer {
return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
}
- @Override
- public void bindQueue(String queueName) {
- try {
- rocketmqPushConsumer.subscribe(queueName, "*");
- } catch (MQClientException e) {
- throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
- }
- }
-
- @Override
- public void bindQueue(List<String> queueNames) {
+ @Override public void bindQueue(Collection<String> queueNames, MessageListener listener) {
for (String queueName : queueNames) {
- bindQueue(queueName);
- }
- }
-
- @Override
- public void bindQueue(String queueName, MessageListener listener) {
- this.subscribeTable.put(queueName, listener);
- this.batchSubscribeTable.remove(queueName);
- try {
- this.rocketmqPushConsumer.subscribe(queueName, "*");
- } catch (MQClientException e) {
- throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
- }
- }
-
- @Override
- public void bindQueues(List<String> queueNames, MessageListener listener) {
- for (String queueName : queueNames) {
- bindQueue(queueName, listener);
- }
- }
-
- @Override
- public void bindQueue(String queueName, BatchMessageListener listener) {
- this.batchSubscribeTable.put(queueName, listener);
- this.subscribeTable.remove(queueName);
- try {
- this.rocketmqPushConsumer.subscribe(queueName, "*");
- } catch (MQClientException e) {
- throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
+ this.subscribeTable.put(queueName, listener);
+ this.batchSubscribeTable.remove(queueName);
+ this.rocketmqPushConsumer.setConsumeMessageBatchMaxSize(NonStandardKeys.PULL_MIN_NUMS);
+ try {
+ this.rocketmqPushConsumer.subscribe(queueName, "*");
+ } catch (MQClientException e) {
+ throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
+ }
}
}
- @Override
- public void bindQueues(List<String> queueNames, BatchMessageListener listener) {
+ @Override public void bindQueue(Collection<String> queueNames, BatchMessageListener listener) {
for (String queueName : queueNames) {
- bindQueue(queueName, listener);
- }
- }
-
- @Override
- public void unbindQueue(String queueName) {
- this.subscribeTable.remove(queueName);
- this.batchSubscribeTable.remove(queueName);
- try {
- this.rocketmqPushConsumer.unsubscribe(queueName);
- } catch (Exception e) {
- throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName));
+ this.batchSubscribeTable.put(queueName, listener);
+ this.subscribeTable.remove(queueName);
+ this.rocketmqPushConsumer.setConsumeMessageBatchMaxSize(clientConfig.getRmqPullMessageBatchNums());
+ try {
+ this.rocketmqPushConsumer.subscribe(queueName, "*");
+ } catch (MQClientException e) {
+ throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
+ }
}
}
- @Override
- public void unbindQueues(List<String> queueNames) {
+ @Override public void unbindQueue(Collection<String> queueNames) {
for (String queueName : queueNames) {
- unbindQueue(queueName);
- }
- }
-
- @Override
- public boolean isBindQueue() {
- Map<String, String> subscription = rocketmqPushConsumer.getSubscription();
- if (null != subscription && subscription.size() > 0) {
- return true;
+ this.subscribeTable.remove(queueName);
+ this.batchSubscribeTable.remove(queueName);
+ try {
+ this.rocketmqPushConsumer.unsubscribe(queueName);
+ } catch (Exception e) {
+ throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName));
+ }
}
- return false;
}
@Override
- public List<String> getBindQueues() {
+ public Set<String> getBindQueues() {
Map<String, String> subscription = rocketmqPushConsumer.getSubscription();
if (null != subscription && subscription.size() > 0) {
- return new ArrayList<>(subscription.keySet());
+ return subscription.keySet();
}
return null;
}
@@ -219,39 +192,13 @@ public class PushConsumerImpl implements Consumer {
}
@Override
- public Message receive(long timeout) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Message receive(String queueName, int partitionId, long receiptId, long timeout) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<Message> batchReceive(long timeout) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void ack(MessageReceipt receipt) {
throw new UnsupportedOperationException();
}
@Override
public Optional<Extension> getExtension() {
- return Optional.of(new Extension() {
-
- @Override
- public QueueMetaData getQueueMetaData(String queueName) {
- return getQueueMetaData(queueName);
- }
- });
+ return Optional.of(extension);
}
@Override
@@ -284,7 +231,7 @@ public class PushConsumerImpl implements Consumer {
}
@Override
- public QueueMetaData getQueueMetaData(String queueName) {
+ public Set<QueueMetaData> getQueueMetaData(String queueName) {
Set<MessageQueue> messageQueues;
try {
messageQueues = rocketmqPushConsumer.fetchSubscribeMessageQueues(queueName);
@@ -292,24 +239,14 @@ public class PushConsumerImpl implements Consumer {
log.error("A error occurred when get queue metadata.", e);
return null;
}
- List<QueueMetaData.Partition> partitions = new ArrayList<>(16);
- if (null != messageQueues && !messageQueues.isEmpty()) {
- for (MessageQueue messageQueue : messageQueues) {
- QueueMetaData.Partition partition = new DefaultQueueMetaData.DefaultPartition(messageQueue.getQueueId(), messageQueue.getBrokerName());
- partitions.add(partition);
- }
- } else {
- return null;
- }
- QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, partitions);
- return queueMetaData;
+ return OMSClientUtil.queueMetaDataConvert(messageQueues);
}
class MessageListenerImpl implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList,
- ConsumeConcurrentlyContext contextRMQ) {
+ ConsumeConcurrentlyContext contextRMQ) {
boolean batchFlag = true;
MessageExt rmqMsg = rmqMsgList.get(0);
BatchMessageListener batchMessageListener = PushConsumerImpl.this.batchSubscribeTable.get(rmqMsg.getTopic());
@@ -319,14 +256,14 @@ public class PushConsumerImpl implements Consumer {
}
if (listener == null && batchMessageListener == null) {
throw new OMSRuntimeException(-1,
- String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
+ String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
}
final KeyValue contextProperties = OMS.newKeyValue();
if (batchFlag) {
- List<Message> messages = new ArrayList<>(16);
+ List<Message> messages = new ArrayList<>(32);
for (MessageExt messageExt : rmqMsgList) {
- BytesMessageImpl omsMsg = OMSUtil.msgConvert(messageExt);
+ BytesMessageImpl omsMsg = OMSClientUtil.msgConvert(messageExt);
messages.add(omsMsg);
}
final CountDownLatch sync = new CountDownLatch(1);
@@ -344,7 +281,7 @@ public class PushConsumerImpl implements Consumer {
public void ack() {
sync.countDown();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
};
long begin = System.currentTimeMillis();
@@ -356,7 +293,7 @@ public class PushConsumerImpl implements Consumer {
} catch (InterruptedException ignore) {
}
} else {
- BytesMessageImpl omsMsg = OMSUtil.msgConvert(rmqMsg);
+ BytesMessageImpl omsMsg = OMSClientUtil.msgConvert(rmqMsg);
final CountDownLatch sync = new CountDownLatch(1);
@@ -368,7 +305,7 @@ public class PushConsumerImpl implements Consumer {
public void ack() {
sync.countDown();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
};
long begin = System.currentTimeMillis();
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
index f1405b2..b5da5ce 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -17,22 +17,26 @@
package io.openmessaging.rocketmq.domain;
import io.openmessaging.KeyValue;
+import io.openmessaging.OMS;
import io.openmessaging.consumer.MessageReceipt;
import io.openmessaging.extension.ExtensionHeader;
import io.openmessaging.message.Header;
import io.openmessaging.message.Message;
-import io.openmessaging.OMS;
-import java.util.Optional;
+import java.util.Arrays;
public class BytesMessageImpl implements Message {
private Header sysHeaders;
+ private ExtensionHeader extensionHeader;
+ private MessageReceipt messageReceipt;
private KeyValue userProperties;
private byte[] data;
public BytesMessageImpl() {
this.sysHeaders = new MessageHeader();
this.userProperties = OMS.newKeyValue();
+ this.extensionHeader = new MessageExtensionHeader();
+ this.messageReceipt = new DefaultMessageReceipt();
}
@Override
@@ -41,8 +45,8 @@ public class BytesMessageImpl implements Message {
}
@Override
- public Optional<ExtensionHeader> extensionHeader() {
- return null;
+ public ExtensionHeader extensionHeader() {
+ return extensionHeader;
}
@Override
@@ -62,6 +66,16 @@ public class BytesMessageImpl implements Message {
@Override
public MessageReceipt getMessageReceipt() {
- return null;
+ return messageReceipt;
+ }
+
+ @Override public String toString() {
+ return "BytesMessageImpl{" +
+ "sysHeaders=" + sysHeaders +
+ ", extensionHeader=" + extensionHeader +
+ ", messageReceipt=" + messageReceipt +
+ ", userProperties=" + userProperties +
+ ", data=" + Arrays.toString(data) +
+ '}';
}
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageFactory.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageFactory.java
new file mode 100644
index 0000000..b3baeb2
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.message.Message;
+import io.openmessaging.message.MessageFactory;
+
+public class DefaultMessageFactory implements MessageFactory {
+ @Override public Message createMessage(String queueName, byte[] body) {
+ Message message = new BytesMessageImpl();
+ message.setData(body);
+ message.header().setDestination(queueName);
+ return message;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageReceipt.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageReceipt.java
new file mode 100644
index 0000000..9339006
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageReceipt.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.consumer.MessageReceipt;
+import java.util.Objects;
+
+public class DefaultMessageReceipt implements MessageReceipt {
+
+ private long offset;
+
+ private String messageId;
+
+ public DefaultMessageReceipt() {
+
+ }
+
+ public DefaultMessageReceipt(String messageId, long offset) {
+ this.messageId = messageId;
+ this.offset = offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setMessageId(String messageId) {
+ this.messageId = messageId;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ DefaultMessageReceipt receipt = (DefaultMessageReceipt) o;
+ return offset == receipt.offset &&
+ Objects.equals(messageId, receipt.messageId);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(offset, messageId);
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultQueueMetaData.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultQueueMetaData.java
new file mode 100644
index 0000000..2958f96
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultQueueMetaData.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.extension.QueueMetaData;
+import java.util.Objects;
+
+public class DefaultQueueMetaData implements QueueMetaData {
+
+ private String queueName;
+
+ private int partitionId;
+
+ public DefaultQueueMetaData(String queueName, int partitionId) {
+ this.queueName = queueName;
+ this.partitionId = partitionId;
+ }
+
+ @Override public void setQueueName(String queueNaome) {
+ this.queueName = queueName;
+ }
+
+ @Override public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ @Override public int partitionId() {
+ return partitionId;
+ }
+
+ @Override
+ public String queueName() {
+ return queueName;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultQueueMetaData data = (DefaultQueueMetaData) o;
+ return partitionId == data.partitionId &&
+ queueName.equals(data.queueName);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(queueName, partitionId);
+ }
+
+ @Override public String toString() {
+ return "DefaultQueueMetaData{" +
+ "queueName='" + queueName + '\'' +
+ ", partitionId=" + partitionId +
+ '}';
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtension.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtension.java
new file mode 100644
index 0000000..7449827
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtension.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.extension.Extension;
+import io.openmessaging.extension.QueueMetaData;
+import java.util.Set;
+
+public class MessageExtension implements Extension {
+
+ private Extension extension;
+
+ public MessageExtension(Extension extension) {
+ this.extension = extension;
+ }
+
+ @Override public Set<QueueMetaData> getQueueMetaData(String queueName) {
+ return extension.getQueueMetaData(queueName);
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtensionHeader.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtensionHeader.java
new file mode 100644
index 0000000..3f103a4
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtensionHeader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.extension.ExtensionHeader;
+
+public class MessageExtensionHeader implements ExtensionHeader {
+ private int partition;
+
+ private long offset;
+
+ private String correlationId;
+
+ private String transactionId;
+
+ private long storeTimestamp;
+
+ private String storeHost;
+
+ private String messageKey;
+
+ private String traceId;
+
+ private long delayTime;
+
+ private long expireTime;
+
+ @Override public ExtensionHeader setPartition(int partition) {
+ this.partition = partition;
+ return this;
+ }
+
+ @Override public ExtensionHeader setOffset(long offset) {
+ this.offset = offset;
+ return this;
+ }
+
+ @Override public ExtensionHeader setCorrelationId(String correlationId) {
+ this.correlationId = correlationId;
+ return this;
+ }
+
+ @Override public ExtensionHeader setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ return this;
+ }
+
+ @Override public ExtensionHeader setStoreTimestamp(long storeTimestamp) {
+ this.storeTimestamp = storeTimestamp;
+ return this;
+ }
+
+ @Override public ExtensionHeader setStoreHost(String storeHost) {
+ this.storeHost = storeHost;
+ return this;
+ }
+
+ @Override public ExtensionHeader setMessageKey(String messageKey) {
+ this.messageKey = messageKey;
+ return this;
+ }
+
+ @Override public ExtensionHeader setTraceId(String traceId) {
+ this.traceId = traceId;
+ return this;
+ }
+
+ @Override public ExtensionHeader setDelayTime(long delayTime) {
+ this.delayTime = delayTime;
+ return this;
+ }
+
+ @Override public ExtensionHeader setExpireTime(long expireTime) {
+ this.expireTime = expireTime;
+ return this;
+ }
+
+ @Override public int getPartiton() {
+ return partition;
+ }
+
+ @Override public long getOffset() {
+ return offset;
+ }
+
+ @Override public String getCorrelationId() {
+ return correlationId;
+ }
+
+ @Override public String getTransactionId() {
+ return transactionId;
+ }
+
+ @Override public long getStoreTimestamp() {
+ return storeTimestamp;
+ }
+
+ @Override public String getStoreHost() {
+ return storeHost;
+ }
+
+ @Override public long getDelayTime() {
+ return delayTime;
+ }
+
+ @Override public long getExpireTime() {
+ return expireTime;
+ }
+
+ @Override public String getMessageKey() {
+ return messageKey;
+ }
+
+ @Override public String getTraceId() {
+ return traceId;
+ }
+
+ @Override public String toString() {
+ return "MessageExtensionHeader{" +
+ "partition=" + partition +
+ ", offset=" + offset +
+ ", correlationId='" + correlationId + '\'' +
+ ", transactionId='" + transactionId + '\'' +
+ ", storeTimestamp=" + storeTimestamp +
+ ", storeHost='" + storeHost + '\'' +
+ ", messageKey='" + messageKey + '\'' +
+ ", traceId='" + traceId + '\'' +
+ ", delayTime=" + delayTime +
+ ", expireTime=" + expireTime +
+ '}';
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java
index a6e7585..495dc1a 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java
@@ -110,4 +110,17 @@ public class MessageHeader implements Header {
@Override public short getCompression() {
return this.compression;
}
+
+ @Override public String toString() {
+ return "MessageHeader{" +
+ "destination='" + destination + '\'' +
+ ", messageId='" + messageId + '\'' +
+ ", bornTimestamp=" + bornTimestamp +
+ ", bornHost='" + bornHost + '\'' +
+ ", priority=" + priority +
+ ", deliveryCount=" + deliveryCount +
+ ", compression=" + compression +
+ ", durability=" + durability +
+ '}';
+ }
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
index 16ecb0d..2d0d3e6 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -30,7 +30,5 @@ public interface NonStandardKeys {
String PRODUCER_ID = "PRODUCER_ID";
String CONSUMER_ID = "CONSUMER_ID";
String TIMEOUT = "TIMEOUT";
- String PULL_CONSUMER = "PULL";
- String PUSH_CONSUMER = "PUSH";
-
+ int PULL_MIN_NUMS = 1;
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index 63034e3..735bace 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
-import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
+import static io.openmessaging.rocketmq.utils.OMSClientUtil.buildInstanceName;
abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
final static InternalLogger log = ClientLogger.getLog();
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
index d3acce2..a44a0f7 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -18,31 +18,38 @@ package io.openmessaging.rocketmq.producer;
import io.openmessaging.Future;
import io.openmessaging.KeyValue;
+import io.openmessaging.Promise;
import io.openmessaging.ServiceLifeState;
import io.openmessaging.exception.OMSMessageFormatException;
+import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
-import io.openmessaging.message.Message;
-import io.openmessaging.Promise;
-import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ProducerInterceptor;
+import io.openmessaging.message.Message;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import io.openmessaging.producer.TransactionalResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.MessageExtension;
import io.openmessaging.rocketmq.promise.DefaultPromise;
-import io.openmessaging.rocketmq.utils.OMSUtil;
+import io.openmessaging.rocketmq.utils.OMSClientUtil;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.MessageQueue;
-import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert;
+import static io.openmessaging.rocketmq.utils.OMSClientUtil.msgConvert;
public class ProducerImpl extends AbstractOMSProducer implements Producer {
+ private final Extension extension;
+
public ProducerImpl(final KeyValue properties) {
super(properties);
+ extension = new MessageExtension(this);
}
@Override
@@ -60,7 +67,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
throw new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.");
}
message.header().setMessageId(rmqResult.getMsgId());
- return OMSUtil.sendResultConvert(rmqResult);
+ return OMSClientUtil.sendResultConvert(rmqResult);
} catch (Exception e) {
log.error(String.format("Send message to RocketMQ failed, %s", message), e);
throw checkProducerException(rmqMessage.getTopic(), message.header().getMessageId(), e);
@@ -81,7 +88,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override
public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
message.header().setMessageId(rmqResult.getMsgId());
- promise.set(OMSUtil.sendResultConvert(rmqResult));
+ promise.set(OMSClientUtil.sendResultConvert(rmqResult));
}
@Override
@@ -112,7 +119,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
}
for (Message message : messages) {
- sendOneway(messages);
+ sendOneway(message);
}
}
@@ -128,7 +135,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
}
for (Message message : messages) {
- sendOneway(messages);
+ sendOneway(message);
}
}
@@ -152,12 +159,19 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override
public Optional<Extension> getExtension() {
- return null;
+ return Optional.of(extension);
}
@Override
- public QueueMetaData getQueueMetaData(String queueName) {
- return null;
+ public Set<QueueMetaData> getQueueMetaData(String queueName) {
+ List<MessageQueue> messageQueues;
+ try {
+ messageQueues = this.rocketmqProducer.fetchPublishMessageQueues(queueName);
+ } catch (MQClientException e) {
+ log.error("A error occurred when get queue metadata.", e);
+ return null;
+ }
+ return OMSClientUtil.queueMetaDataConvert(messageQueues);
}
@Override
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSClientUtil.java
similarity index 77%
rename from openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
rename to openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSClientUtil.java
index 5b095ee..ab3ff34 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSClientUtil.java
@@ -18,19 +18,24 @@ package io.openmessaging.rocketmq.utils;
import io.openmessaging.KeyValue;
import io.openmessaging.OMS;
+import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.message.Header;
import io.openmessaging.producer.SendResult;
+import io.openmessaging.rocketmq.domain.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.RocketMQConstants;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageQueue;
-public class OMSUtil {
+public class OMSClientUtil {
/**
* Builds a OMS client instance name.
@@ -56,7 +61,6 @@ public class OMSUtil {
rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
}
-
for (String key : userHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
}
@@ -82,6 +86,13 @@ public class OMSUtil {
omsMsg.header().setBornHost(String.valueOf(rmqMsg.getBornHost()));
omsMsg.header().setBornTimestamp(rmqMsg.getBornTimestamp());
omsMsg.header().setDeliveryCount(rmqMsg.getDelayTimeLevel());
+ omsMsg.extensionHeader().setPartition(rmqMsg.getQueueId());
+ omsMsg.extensionHeader().setOffset(rmqMsg.getQueueOffset());
+ omsMsg.extensionHeader().setDelayTime(rmqMsg.getDelayTimeLevel());
+ omsMsg.extensionHeader().setMessageKey(rmqMsg.getKeys());
+ omsMsg.extensionHeader().setStoreHost(rmqMsg.getStoreHost().toString());
+ omsMsg.extensionHeader().setStoreTimestamp(rmqMsg.getStoreTimestamp());
+ omsMsg.extensionHeader().setTransactionId(rmqMsg.getTransactionId());
return omsMsg;
}
@@ -116,4 +127,17 @@ public class OMSUtil {
}
return keyValue;
}
+
+ public static Set<QueueMetaData> queueMetaDataConvert(Collection<MessageQueue> messageQueues) {
+ Set<QueueMetaData> queueMetaDatas = new HashSet<>(32);
+ if (null != messageQueues && !messageQueues.isEmpty()) {
+ for (MessageQueue messageQueue : messageQueues) {
+ QueueMetaData queueMetaData = new DefaultQueueMetaData(messageQueue.getTopic(), messageQueue.getQueueId());
+ queueMetaDatas.add(queueMetaData);
+ }
+ } else {
+ return null;
+ }
+ return queueMetaDatas;
+ }
}
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
index 851c283..2a27076 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
@@ -16,10 +16,17 @@
*/
package io.openmessaging.rocketmq.consumer;
+import io.openmessaging.KeyValue;
+import io.openmessaging.extension.QueueMetaData;
+import io.openmessaging.internal.DefaultKeyValue;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Before;
@@ -40,6 +47,8 @@ public class LocalMessageCacheTest {
private DefaultMQPullConsumer rocketmqPullConsume;
@Mock
private ConsumeRequest consumeRequest;
+ @Mock
+ private ConsumeRequest consumeRequest1;
@Before
public void init() {
@@ -86,4 +95,85 @@ public class LocalMessageCacheTest {
localMessageCache.submitConsumeRequest(consumeRequest);
assertThat(localMessageCache.poll()).isEqualTo(consumedMsg);
}
+
+ @Test
+ public void testBatchPollMessage() throws Exception {
+ byte[] body = new byte[] {'1', '2', '3'};
+ MessageExt consumedMsg = new MessageExt();
+ consumedMsg.setMsgId("NewMsgId");
+ consumedMsg.setBody(body);
+ consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+ consumedMsg.setTopic("HELLO_QUEUE");
+
+ byte[] body1 = new byte[] {'4', '5', '6'};
+ MessageExt consumedMsg1 = new MessageExt();
+ consumedMsg1.setMsgId("NewMsgId1");
+ consumedMsg1.setBody(body1);
+ consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+ consumedMsg1.setTopic("HELLO_QUEUE1");
+
+ when(consumeRequest.getMessageExt()).thenReturn(consumedMsg);
+ when(consumeRequest1.getMessageExt()).thenReturn(consumedMsg1);
+ localMessageCache.submitConsumeRequest(consumeRequest);
+ localMessageCache.submitConsumeRequest(consumeRequest1);
+ KeyValue properties = new DefaultKeyValue();
+ properties.put(NonStandardKeys.TIMEOUT, 3000);
+ List<MessageExt> messageExts = localMessageCache.batchPoll(properties);
+ assertThat(messageExts.size()).isEqualTo(2);
+ MessageExt messageExt1 = null;
+ MessageExt messageExt2 = null;
+ for (MessageExt messageExt : messageExts) {
+ if (messageExt.getMsgId().equals("NewMsgId")) {
+ messageExt1 = messageExt;
+ }
+ if (messageExt.getMsgId().equals("NewMsgId1")) {
+ messageExt2 = messageExt;
+ }
+ }
+ assertThat(messageExt1).isNotNull();
+ assertThat(messageExt1.getBody()).isEqualTo(body);
+ assertThat(messageExt1.getTopic()).isEqualTo("HELLO_QUEUE");
+ assertThat(messageExt2).isNotNull();
+ assertThat(messageExt2.getBody()).isEqualTo(body1);
+ assertThat(messageExt2.getTopic()).isEqualTo("HELLO_QUEUE1");
+
+ }
+
+ @Test
+ public void getQueueMetaData() throws MQClientException {
+ MessageQueue messageQueue1 = new MessageQueue("topic1", "brockerName1", 0);
+ MessageQueue messageQueue2 = new MessageQueue("topic1", "brockerName2", 1);
+ MessageQueue messageQueue3 = new MessageQueue("topic1", "brockerName3", 2);
+ Set<MessageQueue> messageQueues = new HashSet<MessageQueue>() {
+ {
+ add(messageQueue1);
+ add(messageQueue2);
+ add(messageQueue3);
+ }
+ };
+
+ when(rocketmqPullConsume.fetchSubscribeMessageQueues("topic1")).thenReturn(messageQueues);
+ Set<QueueMetaData> queueMetaDatas = localMessageCache.getQueueMetaData("topic1");
+ assertThat(queueMetaDatas.size()).isEqualTo(3);
+ QueueMetaData queueMetaData1 = null;
+ QueueMetaData queueMetaData2 = null;
+ QueueMetaData queueMetaData3 = null;
+ for (QueueMetaData queueMetaData : queueMetaDatas) {
+ if (queueMetaData.partitionId() == 0) {
+ queueMetaData1 = queueMetaData;
+ }
+ if (queueMetaData.partitionId() == 1) {
+ queueMetaData2 = queueMetaData;
+ }
+ if (queueMetaData.partitionId() == 2) {
+ queueMetaData3 = queueMetaData;
+ }
+ }
+ assertThat(queueMetaData1).isNotNull();
+ assertThat(queueMetaData1.queueName()).isEqualTo("topic1");
+ assertThat(queueMetaData2).isNotNull();
+ assertThat(queueMetaData2.queueName()).isEqualTo("topic1");
+ assertThat(queueMetaData3).isNotNull();
+ assertThat(queueMetaData3.queueName()).isEqualTo("topic1");
+ }
}
\ No newline at end of file
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
index 7cb5030..1f61b34 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -19,14 +19,28 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
-import io.openmessaging.consumer.Consumer;
-import io.openmessaging.manager.ResourceManager;
+import io.openmessaging.consumer.MessageReceipt;
+import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.DefaultMessageReceipt;
+import io.openmessaging.rocketmq.domain.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -35,12 +49,15 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@RunWith(MockitoJUnitRunner.class)
public class PullConsumerImplTest {
- private Consumer consumer;
+ private PullConsumer pullConsumer;
private String queueName = "HELLO_QUEUE";
@Mock
@@ -50,15 +67,17 @@ public class PullConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS
- .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
- final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
- resourceManager.createNamespace(NonStandardKeys.PULL_CONSUMER +"_TestGroup");
- consumer = messagingAccessPoint.createConsumer();
- consumer.bindQueue(queueName);
+ .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
+ final KeyValue attributes = messagingAccessPoint.attributes();
+ attributes.put(NonStandardKeys.CONSUMER_ID, "TestGroup");
+ pullConsumer = messagingAccessPoint.createPullConsumer();
+ Set<String> queueNames = new HashSet<>(8);
+ queueNames.add(queueName);
+ pullConsumer.bindQueue(queueNames);
Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
field.setAccessible(true);
- field.set(consumer, rocketmqPullConsumer); //Replace
+ field.set(pullConsumer, rocketmqPullConsumer); //Replace
ClientConfig clientConfig = new ClientConfig();
clientConfig.setOperationTimeout(200);
@@ -66,27 +85,133 @@ public class PullConsumerImplTest {
field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
field.setAccessible(true);
- field.set(consumer, localMessageCache);
- consumer.start();
+ field.set(pullConsumer, localMessageCache);
+ pullConsumer.start();
}
@Test
- public void testPoll() {
- final byte[] testBody = new byte[]{'a', 'b'};
+ public void testPoll() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic(queueName);
+ consumedMsg.setQueueId(0);
+ consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
doReturn(consumedMsg).when(localMessageCache).poll(any(KeyValue.class));
- Message message = consumer.receive(3 * 1000);
+ Message message = pullConsumer.receive(3 * 1000);
assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
assertThat(message.getData()).isEqualTo(testBody);
+
+ List<MessageExt> messageExts = new ArrayList<MessageExt>() {
+ {
+ add(consumedMsg);
+ }
+ };
+ PullResult pullResult = new PullResult(PullStatus.FOUND, 11, 1, 100, messageExts);
+ doReturn(pullResult).when(rocketmqPullConsumer).pull(any(MessageQueue.class), anyString(), anyLong(), anyInt(), anyLong());
+ MessageQueue messageQueue = new MessageQueue(queueName, "breakeName", 0);
+ Set<MessageQueue> messageQueues = new HashSet<MessageQueue>() {
+ {
+ add(messageQueue);
+ }
+ };
+ doReturn(messageQueues).when(rocketmqPullConsumer).fetchSubscribeMessageQueues(queueName);
+ QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, 0);
+ MessageReceipt messageReceipt = new DefaultMessageReceipt("NewMsgId", 10L);
+ long timeout = 3000L;
+ Message message1 = pullConsumer.receive(queueName, queueMetaData, messageReceipt, timeout);
+ assertThat(message1.header().getMessageId()).isEqualTo("NewMsgId");
+ assertThat(message1.getData()).isEqualTo(testBody);
+ assertThat(message1.header().getDestination()).isEqualTo(queueName);
+ assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0);
+ }
+
+ @Test
+ public void testBatchPoll() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ final byte[] testBody = new byte[] {'a', 'b'};
+ MessageExt consumedMsg = new MessageExt();
+ consumedMsg.setMsgId("NewMsgId");
+ consumedMsg.setBody(testBody);
+ consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+ consumedMsg.setTopic(queueName);
+ consumedMsg.setQueueId(0);
+ consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
+ final byte[] testBody1 = new byte[] {'c', 'd'};
+ MessageExt consumedMsg1 = new MessageExt();
+ consumedMsg1.setMsgId("NewMsgId1");
+ consumedMsg1.setBody(testBody1);
+ consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+ consumedMsg1.setTopic(queueName);
+ consumedMsg1.setQueueId(0);
+ consumedMsg1.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
+ List<MessageExt> messageExts = new ArrayList<MessageExt>() {
+ {
+ add(consumedMsg);
+ add(consumedMsg1);
+ }
+ };
+ doReturn(messageExts).when(localMessageCache).batchPoll(any(KeyValue.class));
+ List<Message> messages = pullConsumer.batchReceive(3 * 1000);
+
+ Message message1 = null;
+ Message message2 = null;
+ assertThat(messages.size()).isEqualTo(2);
+ for (Message message : messages) {
+ if (message.header().getMessageId().equals("NewMsgId")) {
+ message1 = message;
+ }
+ if (message.header().getMessageId().equals("NewMsgId1")) {
+ message2 = message;
+ }
+ }
+ assertThat(message1).isNotNull();
+ assertThat(message1.getData()).isEqualTo(testBody);
+ assertThat(message1.header().getDestination()).isEqualTo(queueName);
+ assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0);
+ assertThat(message2).isNotNull();
+ assertThat(message2.getData()).isEqualTo(testBody1);
+ assertThat(message2.header().getDestination()).isEqualTo(queueName);
+ assertThat(message2.extensionHeader().getPartiton()).isEqualTo(0);
+
+ PullResult pullResult = new PullResult(PullStatus.FOUND, 11, 1, 100, messageExts);
+ doReturn(pullResult).when(rocketmqPullConsumer).pull(any(MessageQueue.class), anyString(), anyLong(), anyInt(), anyLong());
+ MessageQueue messageQueue = new MessageQueue(queueName, "breakeName", 0);
+ Set<MessageQueue> messageQueues = new HashSet<MessageQueue>() {
+ {
+ add(messageQueue);
+ }
+ };
+ doReturn(messageQueues).when(rocketmqPullConsumer).fetchSubscribeMessageQueues(queueName);
+ QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, 0);
+ MessageReceipt messageReceipt = new DefaultMessageReceipt("NewMsgId", 10L);
+ long timeout = 3000L;
+ List<Message> message1s = pullConsumer.batchReceive(queueName, queueMetaData, messageReceipt, timeout);
+ assertThat(message1s.size()).isEqualTo(2);
+ Message message3 = null;
+ Message message4 = null;
+ for (Message message : message1s) {
+ if (message.header().getMessageId().equals("NewMsgId")) {
+ message3 = message;
+ }
+ if (message.header().getMessageId().equals("NewMsgId1")) {
+ message4 = message;
+ }
+ }
+ assertThat(message3).isNotNull();
+ assertThat(message3.getData()).isEqualTo(testBody);
+ assertThat(message3.header().getDestination()).isEqualTo(queueName);
+ assertThat(message3.extensionHeader().getPartiton()).isEqualTo(0);
+ assertThat(message4).isNotNull();
+ assertThat(message4.getData()).isEqualTo(testBody1);
+ assertThat(message4.header().getDestination()).isEqualTo(queueName);
+ assertThat(message4.extensionHeader().getPartiton()).isEqualTo(0);
}
@Test
public void testPoll_WithTimeout() {
- Message message = consumer.receive(3 * 1000);
+ Message message = pullConsumer.receive(3 * 1000);
assertThat(message).isNull();
}
}
\ No newline at end of file
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
index 51167e8..39af3c1 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -16,12 +16,21 @@
*/
package io.openmessaging.rocketmq.consumer;
-import io.openmessaging.*;
-import io.openmessaging.consumer.Consumer;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.OMS;
+import io.openmessaging.consumer.BatchMessageListener;
import io.openmessaging.consumer.MessageListener;
-import io.openmessaging.manager.ResourceManager;
+import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
@@ -31,15 +40,12 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import java.lang.reflect.Field;
-import java.util.Collections;
-
-import static org.mockito.Mockito.when;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PushConsumerImplTest {
- private Consumer consumer;
+ private PushConsumer pushConsumer;
@Mock
private DefaultMQPushConsumer rocketmqPushConsumer;
@@ -48,17 +54,17 @@ public class PushConsumerImplTest {
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
- final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
- resourceManager.createNamespace(NonStandardKeys.PUSH_CONSUMER + "_TestGroup");
- consumer = messagingAccessPoint.createConsumer();
+ final KeyValue attributes = messagingAccessPoint.attributes();
+ attributes.put(NonStandardKeys.CONSUMER_ID, "TestGroup");
+ pushConsumer = messagingAccessPoint.createPushConsumer();
Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
field.setAccessible(true);
- DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer);
- field.set(consumer, rocketmqPushConsumer); //Replace
+ DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(pushConsumer);
+ field.set(pushConsumer, rocketmqPushConsumer); //Replace
when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
- consumer.start();
+ pushConsumer.start();
}
@Test
@@ -70,7 +76,11 @@ public class PushConsumerImplTest {
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
- consumer.bindQueue("HELLO_QUEUE", new MessageListener() {
+ consumedMsg.setQueueId(0);
+ consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
+ Set<String> queueNames = new HashSet<>(8);
+ queueNames.add("HELLO_QUEUE");
+ pushConsumer.bindQueue(queueNames, new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
@@ -81,4 +91,65 @@ public class PushConsumerImplTest {
((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
}
+
+ @Test
+ public void testBatchConsumeMessage() {
+ final byte[] testBody = new byte[]{'a', 'b'};
+
+ MessageExt consumedMsg = new MessageExt();
+ consumedMsg.setMsgId("NewMsgId");
+ consumedMsg.setBody(testBody);
+ consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+ consumedMsg.setTopic("HELLO_QUEUE");
+ consumedMsg.setQueueId(0);
+ consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
+
+ final byte[] testBody1 = new byte[]{'c', 'd'};
+ MessageExt consumedMsg1 = new MessageExt();
+ consumedMsg1.setMsgId("NewMsgId1");
+ consumedMsg1.setBody(testBody1);
+ consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+ consumedMsg1.setTopic("HELLO_QUEUE");
+ consumedMsg1.setQueueId(0);
+ consumedMsg1.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
+ List<MessageExt> messageExts = new ArrayList<MessageExt>() {
+ {
+ add(consumedMsg);
+ add(consumedMsg1);
+ }
+ };
+
+ Set<String> queueNames = new HashSet<>(8);
+ queueNames.add("HELLO_QUEUE");
+ pushConsumer.bindQueue(queueNames, new BatchMessageListener() {
+ @Override public void onReceived(List<Message> batchMessage, Context context) {
+ assertThat(batchMessage.size()).isEqualTo(2);
+ Message message1 = null;
+ Message message2 = null;
+ for (Message message : batchMessage) {
+ if (message.header().getMessageId().equals("NewMsgId")) {
+ message1 = message;
+ }
+ if (message.header().getMessageId().equals("NewMsgId1")) {
+ message2 = message;
+ }
+ }
+ assertThat(message1).isNotNull();
+ assertThat(message1.getData()).isEqualTo(testBody);
+ assertThat(message1.header().getDestination()).isEqualTo("HELLO_QUEUE");
+ assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0);
+ assertThat(message2).isNotNull();
+ assertThat(message2.getData()).isEqualTo(testBody1);
+ assertThat(message2.header().getDestination()).isEqualTo("HELLO_QUEUE");
+ assertThat(message2.extensionHeader().getPartiton()).isEqualTo(0);
+
+ context.ack();
+ }
+ });
+
+ ((MessageListenerConcurrently) rocketmqPushConsumer
+ .getMessageListener()).consumeMessage(messageExts, null);
+
+ }
+
}
\ No newline at end of file