You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/05/09 06:46:29 UTC
[rocketmq] branch feature_oms_1.0.0 updated: [issue#1198]Implement
the 1.0.0 openmessaging producer API for rocketmq oms module.
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch feature_oms_1.0.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/feature_oms_1.0.0 by this push:
new 3180e55 [issue#1198]Implement the 1.0.0 openmessaging producer API for rocketmq oms module.
new 03c5005 Merge pull request #1202 from zongtanghu/feature_oms_1.0.0
3180e55 is described below
commit 3180e55fab14542afd6ee881d44879e4b87a7453
Author: huzongtang <hu...@cmss.chinamobile.com>
AuthorDate: Thu May 9 14:15:10 2019 +0800
[issue#1198]Implement the 1.0.0 openmessaging producer API for rocketmq oms module.
---
.../example/openmessaging/SimpleProducer.java | 16 ++-
.../rocketmq/MessagingAccessPointImpl.java | 60 +++--------
.../rocketmq/domain/BytesMessageImpl.java | 92 +++++------------
.../rocketmq/domain/MessageHeader.java | 113 +++++++++++++++++++++
.../rocketmq/domain/NonStandardKeys.java | 2 +
.../rocketmq/domain/RocketMQConstants.java | 2 +
.../rocketmq/producer/AbstractOMSProducer.java | 45 ++++----
.../rocketmq/producer/ProducerImpl.java | 103 ++++++++++++-------
.../rocketmq/promise/DefaultPromise.java | 2 +-
.../io/openmessaging/rocketmq/utils/OMSUtil.java | 107 +++++--------------
.../rocketmq/producer/ProducerImplTest.java | 10 +-
.../rocketmq/promise/DefaultPromiseTest.java | 6 +-
12 files changed, 271 insertions(+), 287 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index dbe1d10..3a780b0 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Future;
import io.openmessaging.FutureListener;
-import io.openmessaging.Message;
+import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.producer.Producer;
@@ -32,15 +32,11 @@ public class SimpleProducer {
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final Producer producer = messagingAccessPoint.createProducer();
-
- messagingAccessPoint.startup();
- System.out.printf("MessagingAccessPoint startup OK%n");
-
- producer.startup();
+ producer.start();
System.out.printf("Producer startup OK%n");
{
- Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+ Message message = producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
//final Void aVoid = result.get(3000L);
System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
@@ -48,7 +44,7 @@ public class SimpleProducer {
final CountDownLatch countDownLatch = new CountDownLatch(1);
{
- final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ final Future<SendResult> result = producer.sendAsync(producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new FutureListener<SendResult>() {
@Override
public void operationComplete(Future<SendResult> future) {
@@ -63,7 +59,7 @@ public class SimpleProducer {
}
{
- producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ producer.sendOneway(producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}
@@ -73,6 +69,6 @@ public class SimpleProducer {
} catch (InterruptedException ignore) {
}
- producer.shutdown();
+ producer.stop();
}
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index 51388f9..0655fa5 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -18,16 +18,13 @@ package io.openmessaging.rocketmq;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.ResourceManager;
-import io.openmessaging.consumer.PullConsumer;
-import io.openmessaging.consumer.PushConsumer;
-import io.openmessaging.consumer.StreamingConsumer;
-import io.openmessaging.exception.OMSNotSupportedException;
+import io.openmessaging.consumer.Consumer;
+import io.openmessaging.exception.OMSUnsupportException;
+import io.openmessaging.manager.ResourceManager;
+import io.openmessaging.message.MessageFactory;
import io.openmessaging.producer.Producer;
-import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
-import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
+import io.openmessaging.producer.TransactionStateCheckListener;
import io.openmessaging.rocketmq.producer.ProducerImpl;
-import io.openmessaging.rocketmq.utils.OMSUtil;
public class MessagingAccessPointImpl implements MessagingAccessPoint {
@@ -43,8 +40,8 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
}
@Override
- public String implVersion() {
- return "0.3.0";
+ public String version() {
+ return "1.0.0";
}
@Override
@@ -52,53 +49,20 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
return new ProducerImpl(this.accessPointProperties);
}
- @Override
- public Producer createProducer(KeyValue properties) {
- return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
- }
-
- @Override
- public PushConsumer createPushConsumer() {
- return new PushConsumerImpl(accessPointProperties);
- }
-
- @Override
- public PushConsumer createPushConsumer(KeyValue properties) {
- return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
- }
-
- @Override
- public PullConsumer createPullConsumer() {
- return new PullConsumerImpl(accessPointProperties);
- }
-
- @Override
- public PullConsumer createPullConsumer(KeyValue attributes) {
- return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes));
- }
-
- @Override
- public StreamingConsumer createStreamingConsumer() {
+ @Override public Producer createProducer(TransactionStateCheckListener transactionStateCheckListener) {
return null;
}
- @Override
- public StreamingConsumer createStreamingConsumer(KeyValue attributes) {
+ @Override public Consumer createConsumer() {
return null;
}
@Override
public ResourceManager resourceManager() {
- throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version.");
+ throw new OMSUnsupportException(-1, "ResourceManager is not supported in current version.");
}
- @Override
- public void startup() {
- //Ignore
- }
-
- @Override
- public void shutdown() {
- //Ignore
+ @Override public MessageFactory messageFactory() {
+ return null;
}
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
index 6d8995a..f1405b2 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -16,98 +16,52 @@
*/
package io.openmessaging.rocketmq.domain;
-import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
-import io.openmessaging.Message;
+import io.openmessaging.consumer.MessageReceipt;
+import io.openmessaging.extension.ExtensionHeader;
+import io.openmessaging.message.Header;
+import io.openmessaging.message.Message;
import io.openmessaging.OMS;
-import io.openmessaging.exception.OMSMessageFormatException;
-import org.apache.commons.lang3.builder.ToStringBuilder;
+import java.util.Optional;
-public class BytesMessageImpl implements BytesMessage {
- private KeyValue sysHeaders;
- private KeyValue userHeaders;
- private byte[] body;
+public class BytesMessageImpl implements Message {
- public BytesMessageImpl() {
- this.sysHeaders = OMS.newKeyValue();
- this.userHeaders = OMS.newKeyValue();
- }
-
- @Override
- public <T> T getBody(Class<T> type) throws OMSMessageFormatException {
- if (type == byte[].class) {
- return (T)body;
- }
-
- throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName());
- }
+ private Header sysHeaders;
+ private KeyValue userProperties;
+ private byte[] data;
- @Override
- public BytesMessage setBody(final byte[] body) {
- this.body = body;
- return this;
+ public BytesMessageImpl() {
+ this.sysHeaders = new MessageHeader();
+ this.userProperties = OMS.newKeyValue();
}
@Override
- public KeyValue sysHeaders() {
+ public Header header() {
return sysHeaders;
}
@Override
- public KeyValue userHeaders() {
- return userHeaders;
- }
-
- @Override
- public Message putSysHeaders(String key, int value) {
- sysHeaders.put(key, value);
- return this;
- }
-
- @Override
- public Message putSysHeaders(String key, long value) {
- sysHeaders.put(key, value);
- return this;
- }
-
- @Override
- public Message putSysHeaders(String key, double value) {
- sysHeaders.put(key, value);
- return this;
- }
-
- @Override
- public Message putSysHeaders(String key, String value) {
- sysHeaders.put(key, value);
- return this;
- }
-
- @Override
- public Message putUserHeaders(String key, int value) {
- userHeaders.put(key, value);
- return this;
+ public Optional<ExtensionHeader> extensionHeader() {
+ return null;
}
@Override
- public Message putUserHeaders(String key, long value) {
- userHeaders.put(key, value);
- return this;
+ public KeyValue properties() {
+ return userProperties;
}
@Override
- public Message putUserHeaders(String key, double value) {
- userHeaders.put(key, value);
- return this;
+ public byte[] getData() {
+ return this.data;
}
@Override
- public Message putUserHeaders(String key, String value) {
- userHeaders.put(key, value);
- return this;
+ public void setData(byte[] data) {
+ this.data = data;
}
@Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this);
+ public MessageReceipt getMessageReceipt() {
+ return null;
}
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java
new file mode 100644
index 0000000..8dda492
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.message.Header;
+
+public class MessageHeader implements Header{
+
+ private String destination;
+
+ private String messageId;
+
+ private long bornTimestamp;
+
+ private String bornHost;
+
+ private short priority;
+
+ private int deliveryCount;
+
+ private short compression;
+
+ private short durability;
+
+ public MessageHeader() {
+ }
+
+ @Override public Header setDestination(String destination) {
+ this.destination = destination;
+ return this;
+ }
+
+ @Override public Header setMessageId(String messageId) {
+ this.messageId = messageId;
+ return this;
+ }
+
+ @Override public Header setBornTimestamp(long bornTimestamp) {
+ this.bornTimestamp = bornTimestamp;
+ return this;
+ }
+
+ @Override public Header setBornHost(String bornHost) {
+ this.bornHost = bornHost;
+ return this;
+ }
+
+ @Override public Header setPriority(short priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ @Override public Header setDurability(short durability) {
+ this.durability = durability;
+ return this;
+ }
+
+ @Override public Header setDeliveryCount(int deliveryCount) {
+ this.deliveryCount = deliveryCount;
+ return this;
+ }
+
+ @Override public Header setCompression(short compression) {
+ this.compression = compression;
+ return this;
+ }
+
+ @Override public String getDestination() {
+ return this.destination;
+ }
+
+ @Override public String getMessageId() {
+ return this.messageId;
+ }
+
+ @Override public long getBornTimestamp() {
+ return this.bornTimestamp;
+ }
+
+ @Override public String getBornHost() {
+ return this.bornHost;
+ }
+
+ @Override public short getPriority() {
+ return this.priority;
+ }
+
+ @Override public short getDurability() {
+ return this.durability;
+ }
+
+ @Override public int getDeliveryCount() {
+ return this.deliveryCount;
+ }
+
+ @Override public short getCompression() {
+ return this.compression;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
index 3639a3f..c8d7bb3 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -27,4 +27,6 @@ public interface NonStandardKeys {
String MESSAGE_DESTINATION = "rmq.message.destination";
String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
+ String PRODUCER_ID = "PRODUCER_ID";
+ String CONSUMER_ID ="CONSUMER_ID";
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
index 2bebc8a..b4b4753 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
@@ -23,4 +23,6 @@ public interface RocketMQConstants {
*/
String START_DELIVER_TIME = "__STARTDELIVERTIME";
+ String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
+
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index 3db8590..63034e3 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -16,18 +16,17 @@
*/
package io.openmessaging.rocketmq.producer;
-import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
-import io.openmessaging.Message;
-import io.openmessaging.MessageFactory;
-import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.message.Message;
+import io.openmessaging.message.MessageFactory;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.exception.OMSMessageFormatException;
-import io.openmessaging.exception.OMSNotSupportedException;
+import io.openmessaging.exception.OMSUnsupportException;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.exception.OMSTimeOutException;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -56,7 +55,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
if ("true".equalsIgnoreCase(System.getenv("OMS_RMQ_DIRECT_NAME_SRV"))) {
String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
- throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+ throw new OMSRuntimeException(-1, "OMS AccessPoints is null or empty.");
}
this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
@@ -69,23 +68,23 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
this.rocketmqProducer.setLanguage(LanguageCode.OMS);
- properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
+ properties.put(NonStandardKeys.PRODUCER_ID, producerId);
}
@Override
- public synchronized void startup() {
+ public synchronized void start() {
if (!started) {
try {
this.rocketmqProducer.start();
} catch (MQClientException e) {
- throw new OMSRuntimeException("-1", e);
+ throw new OMSRuntimeException(-1, e);
}
}
this.started = true;
}
@Override
- public synchronized void shutdown() {
+ public synchronized void stop() {
if (this.started) {
this.rocketmqProducer.shutdown();
}
@@ -96,21 +95,20 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
if (e instanceof MQClientException) {
if (e.getCause() != null) {
if (e.getCause() instanceof RemotingTimeoutException) {
- return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
+ return new OMSTimeOutException(-1, String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e);
} else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) {
if (e.getCause() instanceof MQBrokerException) {
MQBrokerException brokerException = (MQBrokerException) e.getCause();
- return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
+ return new OMSRuntimeException(-1, String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
topic, msgId, brokerException.getErrorMessage()), e);
}
if (e.getCause() instanceof RemotingConnectException) {
RemotingConnectException connectException = (RemotingConnectException)e.getCause();
- return new OMSRuntimeException("-1",
+ return new OMSRuntimeException(-1,
String.format("Network connection experiences failures. Topic=%s, msgId=%s, %s",
- topic, msgId, connectException.getMessage()),
- e);
+ topic, msgId, connectException.getMessage()), e);
}
}
}
@@ -118,28 +116,21 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
else {
MQClientException clientException = (MQClientException) e;
if (-1 == clientException.getResponseCode()) {
- return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s",
+ return new OMSRuntimeException(-1, String.format("Topic does not exist, Topic=%s, msgId=%s",
topic, msgId), e);
} else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) {
- return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
+ return new OMSMessageFormatException(-1, String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
topic, msgId), e);
}
}
}
- return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e);
+ return new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.", e);
}
protected void checkMessageType(Message message) {
- if (!(message instanceof BytesMessage)) {
- throw new OMSNotSupportedException("-1", "Only BytesMessage is supported.");
+ if (!(message instanceof BytesMessageImpl)) {
+ throw new OMSUnsupportException(-1, "Only BytesMessage is supported.");
}
}
- @Override
- public BytesMessage createBytesMessage(String queue, byte[] body) {
- BytesMessage message = new BytesMessageImpl();
- message.setBody(body);
- message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue);
- return message;
- }
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
index c2b6d3e..d3acce2 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -16,18 +16,24 @@
*/
package io.openmessaging.rocketmq.producer;
-import io.openmessaging.BytesMessage;
+import io.openmessaging.Future;
import io.openmessaging.KeyValue;
-import io.openmessaging.Message;
+import io.openmessaging.ServiceLifeState;
+import io.openmessaging.exception.OMSMessageFormatException;
+import io.openmessaging.extension.Extension;
+import io.openmessaging.extension.QueueMetaData;
+import io.openmessaging.message.Message;
import io.openmessaging.Promise;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ProducerInterceptor;
-import io.openmessaging.producer.BatchMessageSender;
-import io.openmessaging.producer.LocalTransactionExecutor;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
+import io.openmessaging.producer.TransactionalResult;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.promise.DefaultPromise;
import io.openmessaging.rocketmq.utils.OMSUtil;
+import java.util.List;
+import java.util.Optional;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendStatus;
@@ -40,41 +46,24 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
}
@Override
- public KeyValue attributes() {
- return properties;
- }
-
- @Override
public SendResult send(final Message message) {
return send(message, this.rocketmqProducer.getSendMsgTimeout());
}
- @Override
- public SendResult send(final Message message, final KeyValue properties) {
- long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
- ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
- return send(message, timeout);
- }
-
- @Override
- public SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes) {
- return null;
- }
-
private SendResult send(final Message message, long timeout) {
checkMessageType(message);
- org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+ org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
try {
org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
log.error(String.format("Send message to RocketMQ failed, %s", message));
- throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
+ throw new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.");
}
- message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
+ message.header().setMessageId(rmqResult.getMsgId());
return OMSUtil.sendResultConvert(rmqResult);
} catch (Exception e) {
log.error(String.format("Send message to RocketMQ failed, %s", message), e);
- throw checkProducerException(rmqMessage.getTopic(), message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e);
+ throw checkProducerException(rmqMessage.getTopic(), message.header().getMessageId(), e);
}
}
@@ -83,22 +72,15 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
}
- @Override
- public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
- long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
- ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
- return sendAsync(message, timeout);
- }
-
private Promise<SendResult> sendAsync(final Message message, long timeout) {
checkMessageType(message);
- org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+ org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
final Promise<SendResult> promise = new DefaultPromise<>();
try {
this.rocketmqProducer.send(rmqMessage, new SendCallback() {
@Override
public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
- message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
+ message.header().setMessageId(rmqResult.getMsgId());
promise.set(OMSUtil.sendResultConvert(rmqResult));
}
@@ -116,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override
public void sendOneway(final Message message) {
checkMessageType(message);
- org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+ org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
try {
this.rocketmqProducer.sendOneway(rmqMessage);
} catch (Exception ignore) { //Ignore the oneway exception.
@@ -124,22 +106,65 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
}
@Override
- public void sendOneway(final Message message, final KeyValue properties) {
- sendOneway(message);
+ public void send(List<Message> messages) {
+ if (messages == null || messages.isEmpty()) {
+ throw new OMSMessageFormatException(-1, "The messages collection is empty");
+ }
+
+ for (Message message : messages) {
+ sendOneway(messages);
+ }
}
@Override
- public BatchMessageSender createBatchMessageSender() {
+ public Future<SendResult> sendAsync(List<Message> messages) {
return null;
}
@Override
- public void addInterceptor(ProducerInterceptor interceptor) {
+ public void sendOneway(List<Message> messages) {
+ if (messages == null || messages.isEmpty()) {
+ throw new OMSMessageFormatException(-1, "The messages collection is empty");
+ }
+ for (Message message : messages) {
+ sendOneway(messages);
+ }
+ }
+
+ @Override
+ public void addInterceptor(ProducerInterceptor interceptor) {
}
@Override
public void removeInterceptor(ProducerInterceptor interceptor) {
+ }
+
+ @Override
+ public TransactionalResult prepare(Message message) {
+ return null;
+ }
+
+ @Override
+ public ServiceLifeState currentState() {
+ return null;
+ }
+
+ @Override
+ public Optional<Extension> getExtension() {
+ return null;
+ }
+ @Override
+ public QueueMetaData getQueueMetaData(String queueName) {
+ return null;
+ }
+
+ @Override
+ public Message createMessage(String queueName, byte[] body) {
+ Message message = new BytesMessageImpl();
+ message.setData(body);
+ message.header().setDestination(queueName);
+ return message;
}
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
index c1b5999..e472d2a 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -175,7 +175,7 @@ public class DefaultPromise<V> implements Promise<V> {
private V getValueOrThrowable() {
if (exception != null) {
Throwable e = exception.getCause() != null ? exception.getCause() : exception;
- throw new OMSRuntimeException("-1", e);
+ throw new OMSRuntimeException(-1, e);
}
notifyListeners();
return result;
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
index 66af8ce..5b095ee 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
@@ -16,18 +16,15 @@
*/
package io.openmessaging.rocketmq.utils;
-import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
-import io.openmessaging.Message.BuiltinKeys;
import io.openmessaging.OMS;
+import io.openmessaging.message.Header;
import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.RocketMQConstants;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
-import java.util.Iterator;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.UtilAll;
@@ -44,68 +41,55 @@ public class OMSUtil {
return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
}
- public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
+ public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessageImpl omsMessage) {
org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
- rmqMessage.setBody(omsMessage.getBody(byte[].class));
+ rmqMessage.setBody(omsMessage.getData());
- KeyValue sysHeaders = omsMessage.sysHeaders();
- KeyValue userHeaders = omsMessage.userHeaders();
+ Header sysHeaders = omsMessage.header();
+ KeyValue userHeaders = omsMessage.properties();
//All destinations in RocketMQ use Topic
- rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION));
+ rmqMessage.setTopic(sysHeaders.getDestination());
- if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) {
- long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0);
- if (deliverTime > 0) {
- rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
- }
+ long deliverTime = sysHeaders.getBornTimestamp();
+ if (deliverTime > 0) {
+ rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
}
+
for (String key : userHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
}
- //System headers has a high priority
- for (String key : sysHeaders.keySet()) {
- MessageAccessor.putProperty(rmqMessage, key, sysHeaders.getString(key));
- }
-
+ MessageAccessor.putProperty(rmqMessage, RocketMQConstants.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(sysHeaders.getDeliveryCount()));
return rmqMessage;
}
- public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
- BytesMessage omsMsg = new BytesMessageImpl();
- omsMsg.setBody(rmqMsg.getBody());
-
- KeyValue headers = omsMsg.sysHeaders();
- KeyValue properties = omsMsg.userHeaders();
+ public static BytesMessageImpl msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
+ BytesMessageImpl omsMsg = new BytesMessageImpl();
+ omsMsg.setData(rmqMsg.getBody());
final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
for (final Map.Entry<String, String> entry : entries) {
- if (isOMSHeader(entry.getKey())) {
- headers.put(entry.getKey(), entry.getValue());
- } else {
- properties.put(entry.getKey(), entry.getValue());
+ if (!isOMSHeader(entry.getKey())) {
+ omsMsg.properties().put(entry.getKey(), entry.getValue());
}
}
- omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId());
-
- omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic());
+ omsMsg.header().setMessageId(rmqMsg.getMsgId());
+ omsMsg.header().setDestination(rmqMsg.getTopic());
+ omsMsg.header().setBornHost(String.valueOf(rmqMsg.getBornHost()));
+ omsMsg.header().setBornTimestamp(rmqMsg.getBornTimestamp());
+ omsMsg.header().setDeliveryCount(rmqMsg.getDelayTimeLevel());
- omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
- omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
- omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
- omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
- omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
return omsMsg;
}
public static boolean isOMSHeader(String value) {
- for (Field field : BuiltinKeys.class.getDeclaredFields()) {
+ for (Field field : Header.class.getDeclaredFields()) {
try {
- if (field.get(BuiltinKeys.class).equals(value)) {
+ if (field.get(Header.class).equals(value)) {
return true;
}
} catch (IllegalAccessException e) {
@@ -132,49 +116,4 @@ public class OMSUtil {
}
return keyValue;
}
-
- /**
- * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
- */
- public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
- return new Iterator<T>() {
- Iterator<T> iterator = new Iterator<T>() {
- @Override
- public synchronized boolean hasNext() {
- return false;
- }
-
- @Override
- public synchronized T next() {
- throw new NoSuchElementException();
- }
-
- @Override
- public synchronized void remove() {
- //Ignore
- }
- };
-
- @Override
- public synchronized boolean hasNext() {
- return iterator.hasNext() || iterable.iterator().hasNext();
- }
-
- @Override
- public synchronized T next() {
- if (!iterator.hasNext()) {
- iterator = iterable.iterator();
- if (!iterator.hasNext()) {
- throw new NoSuchElementException();
- }
- }
- return iterator.next();
- }
-
- @Override
- public synchronized void remove() {
- iterator.remove();
- }
- };
- }
}
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
index fc6515e..61178c9 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -56,9 +56,7 @@ public class ProducerImplTest {
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
field.setAccessible(true);
field.set(producer, rocketmqProducer);
-
- messagingAccessPoint.startup();
- producer.startup();
+ producer.start();
}
@Test
@@ -68,7 +66,7 @@ public class ProducerImplTest {
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
io.openmessaging.producer.SendResult omsResult =
- producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
+ producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
}
@@ -80,7 +78,7 @@ public class ProducerImplTest {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
try {
- producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
+ producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
@@ -91,7 +89,7 @@ public class ProducerImplTest {
public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
try {
- producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
+ producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
index f226ede..0f6414f 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
@@ -87,7 +87,7 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_WithException_ListenerAfterSet() throws Exception {
- final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+ final Throwable exception = new OMSRuntimeException(-1, "Test Error");
promise.setFailure(exception);
promise.addListener(new FutureListener<String>() {
@Override
@@ -99,7 +99,7 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_WithException() throws Exception {
- final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+ final Throwable exception = new OMSRuntimeException(-1, "Test Error");
promise.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) {
@@ -112,7 +112,7 @@ public class DefaultPromiseTest {
@Test
public void getThrowable() throws Exception {
assertThat(promise.getThrowable()).isNull();
- Throwable exception = new OMSRuntimeException("-1", "Test Error");
+ Throwable exception = new OMSRuntimeException(-1, "Test Error");
promise.setFailure(exception);
assertThat(promise.getThrowable()).isEqualTo(exception);
}