You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/05/09 06:46:29 UTC

[rocketmq] branch feature_oms_1.0.0 updated: [issue#1198]Implement the 1.0.0 openmessaging producer API for rocketmq oms module.

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch feature_oms_1.0.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/feature_oms_1.0.0 by this push:
     new 3180e55  [issue#1198]Implement the 1.0.0 openmessaging producer API for rocketmq oms module.
     new 03c5005  Merge pull request #1202 from zongtanghu/feature_oms_1.0.0
3180e55 is described below

commit 3180e55fab14542afd6ee881d44879e4b87a7453
Author: huzongtang <hu...@cmss.chinamobile.com>
AuthorDate: Thu May 9 14:15:10 2019 +0800

    [issue#1198]Implement the 1.0.0 openmessaging producer API for rocketmq oms module.
---
 .../example/openmessaging/SimpleProducer.java      |  16 ++-
 .../rocketmq/MessagingAccessPointImpl.java         |  60 +++--------
 .../rocketmq/domain/BytesMessageImpl.java          |  92 +++++------------
 .../rocketmq/domain/MessageHeader.java             | 113 +++++++++++++++++++++
 .../rocketmq/domain/NonStandardKeys.java           |   2 +
 .../rocketmq/domain/RocketMQConstants.java         |   2 +
 .../rocketmq/producer/AbstractOMSProducer.java     |  45 ++++----
 .../rocketmq/producer/ProducerImpl.java            | 103 ++++++++++++-------
 .../rocketmq/promise/DefaultPromise.java           |   2 +-
 .../io/openmessaging/rocketmq/utils/OMSUtil.java   | 107 +++++--------------
 .../rocketmq/producer/ProducerImplTest.java        |  10 +-
 .../rocketmq/promise/DefaultPromiseTest.java       |   6 +-
 12 files changed, 271 insertions(+), 287 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index dbe1d10..3a780b0 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.example.openmessaging;
 
 import io.openmessaging.Future;
 import io.openmessaging.FutureListener;
-import io.openmessaging.Message;
+import io.openmessaging.message.Message;
 import io.openmessaging.MessagingAccessPoint;
 import io.openmessaging.OMS;
 import io.openmessaging.producer.Producer;
@@ -32,15 +32,11 @@ public class SimpleProducer {
             OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
 
         final Producer producer = messagingAccessPoint.createProducer();
-
-        messagingAccessPoint.startup();
-        System.out.printf("MessagingAccessPoint startup OK%n");
-
-        producer.startup();
+        producer.start();
         System.out.printf("Producer startup OK%n");
 
         {
-            Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+            Message message = producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
             SendResult sendResult = producer.send(message);
             //final Void aVoid = result.get(3000L);
             System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
@@ -48,7 +44,7 @@ public class SimpleProducer {
 
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         {
-            final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            final Future<SendResult> result = producer.sendAsync(producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
             result.addListener(new FutureListener<SendResult>() {
                 @Override
                 public void operationComplete(Future<SendResult> future) {
@@ -63,7 +59,7 @@ public class SimpleProducer {
         }
 
         {
-            producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            producer.sendOneway(producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
             System.out.printf("Send oneway message OK%n");
         }
 
@@ -73,6 +69,6 @@ public class SimpleProducer {
         } catch (InterruptedException ignore) {
         }
 
-        producer.shutdown();
+        producer.stop();
     }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index 51388f9..0655fa5 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -18,16 +18,13 @@ package io.openmessaging.rocketmq;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.ResourceManager;
-import io.openmessaging.consumer.PullConsumer;
-import io.openmessaging.consumer.PushConsumer;
-import io.openmessaging.consumer.StreamingConsumer;
-import io.openmessaging.exception.OMSNotSupportedException;
+import io.openmessaging.consumer.Consumer;
+import io.openmessaging.exception.OMSUnsupportException;
+import io.openmessaging.manager.ResourceManager;
+import io.openmessaging.message.MessageFactory;
 import io.openmessaging.producer.Producer;
-import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
-import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
+import io.openmessaging.producer.TransactionStateCheckListener;
 import io.openmessaging.rocketmq.producer.ProducerImpl;
-import io.openmessaging.rocketmq.utils.OMSUtil;
 
 public class MessagingAccessPointImpl implements MessagingAccessPoint {
 
@@ -43,8 +40,8 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
     }
 
     @Override
-    public String implVersion() {
-        return "0.3.0";
+    public String version() {
+        return "1.0.0";
     }
 
     @Override
@@ -52,53 +49,20 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
         return new ProducerImpl(this.accessPointProperties);
     }
 
-    @Override
-    public Producer createProducer(KeyValue properties) {
-        return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
-    }
-
-    @Override
-    public PushConsumer createPushConsumer() {
-        return new PushConsumerImpl(accessPointProperties);
-    }
-
-    @Override
-    public PushConsumer createPushConsumer(KeyValue properties) {
-        return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
-    }
-
-    @Override
-    public PullConsumer createPullConsumer() {
-        return new PullConsumerImpl(accessPointProperties);
-    }
-
-    @Override
-    public PullConsumer createPullConsumer(KeyValue attributes) {
-        return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes));
-    }
-
-    @Override
-    public StreamingConsumer createStreamingConsumer() {
+    @Override public Producer createProducer(TransactionStateCheckListener transactionStateCheckListener) {
         return null;
     }
 
-    @Override
-    public StreamingConsumer createStreamingConsumer(KeyValue attributes) {
+    @Override public Consumer createConsumer() {
         return null;
     }
 
     @Override
     public ResourceManager resourceManager() {
-        throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version.");
+        throw new OMSUnsupportException(-1, "ResourceManager is not supported in current version.");
     }
 
-    @Override
-    public void startup() {
-        //Ignore
-    }
-
-    @Override
-    public void shutdown() {
-        //Ignore
+    @Override public MessageFactory messageFactory() {
+        return null;
     }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
index 6d8995a..f1405b2 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -16,98 +16,52 @@
  */
 package io.openmessaging.rocketmq.domain;
 
-import io.openmessaging.BytesMessage;
 import io.openmessaging.KeyValue;
-import io.openmessaging.Message;
+import io.openmessaging.consumer.MessageReceipt;
+import io.openmessaging.extension.ExtensionHeader;
+import io.openmessaging.message.Header;
+import io.openmessaging.message.Message;
 import io.openmessaging.OMS;
-import io.openmessaging.exception.OMSMessageFormatException;
-import org.apache.commons.lang3.builder.ToStringBuilder;
+import java.util.Optional;
 
-public class BytesMessageImpl implements BytesMessage {
-    private KeyValue sysHeaders;
-    private KeyValue userHeaders;
-    private byte[] body;
+public class BytesMessageImpl implements Message {
 
-    public BytesMessageImpl() {
-        this.sysHeaders = OMS.newKeyValue();
-        this.userHeaders = OMS.newKeyValue();
-    }
-
-    @Override
-    public <T> T getBody(Class<T> type) throws OMSMessageFormatException {
-        if (type == byte[].class) {
-            return (T)body;
-        }
-
-        throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName());
-    }
+    private Header sysHeaders;
+    private KeyValue userProperties;
+    private byte[] data;
 
-    @Override
-    public BytesMessage setBody(final byte[] body) {
-        this.body = body;
-        return this;
+    public BytesMessageImpl() {
+        this.sysHeaders = new MessageHeader();
+        this.userProperties = OMS.newKeyValue();
     }
 
     @Override
-    public KeyValue sysHeaders() {
+    public Header header() {
         return sysHeaders;
     }
 
     @Override
-    public KeyValue userHeaders() {
-        return userHeaders;
-    }
-
-    @Override
-    public Message putSysHeaders(String key, int value) {
-        sysHeaders.put(key, value);
-        return this;
-    }
-
-    @Override
-    public Message putSysHeaders(String key, long value) {
-        sysHeaders.put(key, value);
-        return this;
-    }
-
-    @Override
-    public Message putSysHeaders(String key, double value) {
-        sysHeaders.put(key, value);
-        return this;
-    }
-
-    @Override
-    public Message putSysHeaders(String key, String value) {
-        sysHeaders.put(key, value);
-        return this;
-    }
-
-    @Override
-    public Message putUserHeaders(String key, int value) {
-        userHeaders.put(key, value);
-        return this;
+    public Optional<ExtensionHeader> extensionHeader() {
+        return null;
     }
 
     @Override
-    public Message putUserHeaders(String key, long value) {
-        userHeaders.put(key, value);
-        return this;
+    public KeyValue properties() {
+        return userProperties;
     }
 
     @Override
-    public Message putUserHeaders(String key, double value) {
-        userHeaders.put(key, value);
-        return this;
+    public byte[] getData() {
+        return this.data;
     }
 
     @Override
-    public Message putUserHeaders(String key, String value) {
-        userHeaders.put(key, value);
-        return this;
+    public void setData(byte[] data) {
+        this.data = data;
     }
 
     @Override
-    public String toString() {
-        return ToStringBuilder.reflectionToString(this);
+    public MessageReceipt getMessageReceipt() {
+        return null;
     }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java
new file mode 100644
index 0000000..8dda492
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.message.Header;
+
+public class MessageHeader implements Header{
+
+    private String destination;
+
+    private String messageId;
+
+    private long bornTimestamp;
+
+    private String bornHost;
+
+    private short priority;
+
+    private int deliveryCount;
+
+    private short compression;
+
+    private short durability;
+
+    public MessageHeader() {
+    }
+
+    @Override public Header setDestination(String destination) {
+        this.destination = destination;
+        return this;
+    }
+
+    @Override public Header setMessageId(String messageId) {
+        this.messageId = messageId;
+        return this;
+    }
+
+    @Override public Header setBornTimestamp(long bornTimestamp) {
+        this.bornTimestamp = bornTimestamp;
+        return this;
+    }
+
+    @Override public Header setBornHost(String bornHost) {
+        this.bornHost = bornHost;
+        return this;
+    }
+
+    @Override public Header setPriority(short priority) {
+        this.priority = priority;
+        return this;
+    }
+
+    @Override public Header setDurability(short durability) {
+        this.durability = durability;
+        return this;
+    }
+
+    @Override public Header setDeliveryCount(int deliveryCount) {
+        this.deliveryCount = deliveryCount;
+        return this;
+    }
+
+    @Override public Header setCompression(short compression) {
+        this.compression = compression;
+        return this;
+    }
+
+    @Override public String getDestination() {
+        return this.destination;
+    }
+
+    @Override public String getMessageId() {
+        return this.messageId;
+    }
+
+    @Override public long getBornTimestamp() {
+        return this.bornTimestamp;
+    }
+
+    @Override public String getBornHost() {
+        return this.bornHost;
+    }
+
+    @Override public short getPriority() {
+        return this.priority;
+    }
+
+    @Override public short getDurability() {
+        return this.durability;
+    }
+
+    @Override public int getDeliveryCount() {
+        return this.deliveryCount;
+    }
+
+    @Override public short getCompression() {
+        return this.compression;
+    }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
index 3639a3f..c8d7bb3 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -27,4 +27,6 @@ public interface NonStandardKeys {
     String MESSAGE_DESTINATION = "rmq.message.destination";
     String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
     String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
+    String PRODUCER_ID = "PRODUCER_ID";
+    String CONSUMER_ID ="CONSUMER_ID";
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
index 2bebc8a..b4b4753 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
@@ -23,4 +23,6 @@ public interface RocketMQConstants {
      */
     String START_DELIVER_TIME = "__STARTDELIVERTIME";
 
+    String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
+
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index 3db8590..63034e3 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -16,18 +16,17 @@
  */
 package io.openmessaging.rocketmq.producer;
 
-import io.openmessaging.BytesMessage;
 import io.openmessaging.KeyValue;
-import io.openmessaging.Message;
-import io.openmessaging.MessageFactory;
-import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.message.Message;
+import io.openmessaging.message.MessageFactory;
 import io.openmessaging.ServiceLifecycle;
 import io.openmessaging.exception.OMSMessageFormatException;
-import io.openmessaging.exception.OMSNotSupportedException;
+import io.openmessaging.exception.OMSUnsupportException;
 import io.openmessaging.exception.OMSRuntimeException;
 import io.openmessaging.exception.OMSTimeOutException;
 import io.openmessaging.rocketmq.config.ClientConfig;
 import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import io.openmessaging.rocketmq.utils.BeanUtils;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -56,7 +55,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
         if ("true".equalsIgnoreCase(System.getenv("OMS_RMQ_DIRECT_NAME_SRV"))) {
             String accessPoints = clientConfig.getAccessPoints();
             if (accessPoints == null || accessPoints.isEmpty()) {
-                throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+                throw new OMSRuntimeException(-1, "OMS AccessPoints is null or empty.");
             }
 
             this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
@@ -69,23 +68,23 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
         this.rocketmqProducer.setInstanceName(producerId);
         this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
         this.rocketmqProducer.setLanguage(LanguageCode.OMS);
-        properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
+        properties.put(NonStandardKeys.PRODUCER_ID, producerId);
     }
 
     @Override
-    public synchronized void startup() {
+    public synchronized void start() {
         if (!started) {
             try {
                 this.rocketmqProducer.start();
             } catch (MQClientException e) {
-                throw new OMSRuntimeException("-1", e);
+                throw new OMSRuntimeException(-1, e);
             }
         }
         this.started = true;
     }
 
     @Override
-    public synchronized void shutdown() {
+    public synchronized void stop() {
         if (this.started) {
             this.rocketmqProducer.shutdown();
         }
@@ -96,21 +95,20 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
         if (e instanceof MQClientException) {
             if (e.getCause() != null) {
                 if (e.getCause() instanceof RemotingTimeoutException) {
-                    return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
+                    return new OMSTimeOutException(-1, String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
                         this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e);
                 } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) {
                     if (e.getCause() instanceof MQBrokerException) {
                         MQBrokerException brokerException = (MQBrokerException) e.getCause();
-                        return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
+                        return new OMSRuntimeException(-1, String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
                             topic, msgId, brokerException.getErrorMessage()), e);
                     }
 
                     if (e.getCause() instanceof RemotingConnectException) {
                         RemotingConnectException connectException = (RemotingConnectException)e.getCause();
-                        return new OMSRuntimeException("-1",
+                        return new OMSRuntimeException(-1,
                             String.format("Network connection experiences failures. Topic=%s, msgId=%s, %s",
-                                topic, msgId, connectException.getMessage()),
-                            e);
+                                topic, msgId, connectException.getMessage()), e);
                     }
                 }
             }
@@ -118,28 +116,21 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
             else {
                 MQClientException clientException = (MQClientException) e;
                 if (-1 == clientException.getResponseCode()) {
-                    return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s",
+                    return new OMSRuntimeException(-1, String.format("Topic does not exist, Topic=%s, msgId=%s",
                         topic, msgId), e);
                 } else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) {
-                    return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
+                    return new OMSMessageFormatException(-1, String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
                         topic, msgId), e);
                 }
             }
         }
-        return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e);
+        return new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.", e);
     }
 
     protected void checkMessageType(Message message) {
-        if (!(message instanceof BytesMessage)) {
-            throw new OMSNotSupportedException("-1", "Only BytesMessage is supported.");
+        if (!(message instanceof BytesMessageImpl)) {
+            throw new OMSUnsupportException(-1, "Only BytesMessage is supported.");
         }
     }
 
-    @Override
-    public BytesMessage createBytesMessage(String queue, byte[] body) {
-        BytesMessage message = new BytesMessageImpl();
-        message.setBody(body);
-        message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue);
-        return message;
-    }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
index c2b6d3e..d3acce2 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -16,18 +16,24 @@
  */
 package io.openmessaging.rocketmq.producer;
 
-import io.openmessaging.BytesMessage;
+import io.openmessaging.Future;
 import io.openmessaging.KeyValue;
-import io.openmessaging.Message;
+import io.openmessaging.ServiceLifeState;
+import io.openmessaging.exception.OMSMessageFormatException;
+import io.openmessaging.extension.Extension;
+import io.openmessaging.extension.QueueMetaData;
+import io.openmessaging.message.Message;
 import io.openmessaging.Promise;
 import io.openmessaging.exception.OMSRuntimeException;
 import io.openmessaging.interceptor.ProducerInterceptor;
-import io.openmessaging.producer.BatchMessageSender;
-import io.openmessaging.producer.LocalTransactionExecutor;
 import io.openmessaging.producer.Producer;
 import io.openmessaging.producer.SendResult;
+import io.openmessaging.producer.TransactionalResult;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
 import io.openmessaging.rocketmq.promise.DefaultPromise;
 import io.openmessaging.rocketmq.utils.OMSUtil;
+import java.util.List;
+import java.util.Optional;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendStatus;
 
@@ -40,41 +46,24 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
     }
 
     @Override
-    public KeyValue attributes() {
-        return properties;
-    }
-
-    @Override
     public SendResult send(final Message message) {
         return send(message, this.rocketmqProducer.getSendMsgTimeout());
     }
 
-    @Override
-    public SendResult send(final Message message, final KeyValue properties) {
-        long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
-            ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
-        return send(message, timeout);
-    }
-
-    @Override
-    public SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes) {
-        return null;
-    }
-
     private SendResult send(final Message message, long timeout) {
         checkMessageType(message);
-        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
         try {
             org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
             if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
                 log.error(String.format("Send message to RocketMQ failed, %s", message));
-                throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
+                throw new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.");
             }
-            message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
+            message.header().setMessageId(rmqResult.getMsgId());
             return OMSUtil.sendResultConvert(rmqResult);
         } catch (Exception e) {
             log.error(String.format("Send message to RocketMQ failed, %s", message), e);
-            throw checkProducerException(rmqMessage.getTopic(), message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e);
+            throw checkProducerException(rmqMessage.getTopic(), message.header().getMessageId(), e);
         }
     }
 
@@ -83,22 +72,15 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
         return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
     }
 
-    @Override
-    public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
-        long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
-            ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
-        return sendAsync(message, timeout);
-    }
-
     private Promise<SendResult> sendAsync(final Message message, long timeout) {
         checkMessageType(message);
-        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
         final Promise<SendResult> promise = new DefaultPromise<>();
         try {
             this.rocketmqProducer.send(rmqMessage, new SendCallback() {
                 @Override
                 public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
-                    message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
+                    message.header().setMessageId(rmqResult.getMsgId());
                     promise.set(OMSUtil.sendResultConvert(rmqResult));
                 }
 
@@ -116,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
     @Override
     public void sendOneway(final Message message) {
         checkMessageType(message);
-        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
         try {
             this.rocketmqProducer.sendOneway(rmqMessage);
         } catch (Exception ignore) { //Ignore the oneway exception.
@@ -124,22 +106,65 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
     }
 
     @Override
-    public void sendOneway(final Message message, final KeyValue properties) {
-        sendOneway(message);
+    public void send(List<Message> messages) {
+        if (messages == null || messages.isEmpty()) {
+            throw new OMSMessageFormatException(-1, "The messages collection is empty");
+        }
+
+        for (Message message : messages) {
+            sendOneway(messages);
+        }
     }
 
     @Override
-    public BatchMessageSender createBatchMessageSender() {
+    public Future<SendResult> sendAsync(List<Message> messages) {
         return null;
     }
 
     @Override
-    public void addInterceptor(ProducerInterceptor interceptor) {
+    public void sendOneway(List<Message> messages) {
+        if (messages == null || messages.isEmpty()) {
+            throw new OMSMessageFormatException(-1, "The messages collection is empty");
+        }
 
+        for (Message message : messages) {
+            sendOneway(messages);
+        }
+    }
+
+    @Override
+    public void addInterceptor(ProducerInterceptor interceptor) {
     }
 
     @Override
     public void removeInterceptor(ProducerInterceptor interceptor) {
+    }
+
+    @Override
+    public TransactionalResult prepare(Message message) {
+        return null;
+    }
+
+    @Override
+    public ServiceLifeState currentState() {
+        return null;
+    }
+
+    @Override
+    public Optional<Extension> getExtension() {
+        return null;
+    }
 
+    @Override
+    public QueueMetaData getQueueMetaData(String queueName) {
+        return null;
+    }
+
+    @Override
+    public Message createMessage(String queueName, byte[] body) {
+        Message message = new BytesMessageImpl();
+        message.setData(body);
+        message.header().setDestination(queueName);
+        return message;
     }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
index c1b5999..e472d2a 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -175,7 +175,7 @@ public class DefaultPromise<V> implements Promise<V> {
     private V getValueOrThrowable() {
         if (exception != null) {
             Throwable e = exception.getCause() != null ? exception.getCause() : exception;
-            throw new OMSRuntimeException("-1", e);
+            throw new OMSRuntimeException(-1, e);
         }
         notifyListeners();
         return result;
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
index 66af8ce..5b095ee 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
@@ -16,18 +16,15 @@
  */
 package io.openmessaging.rocketmq.utils;
 
-import io.openmessaging.BytesMessage;
 import io.openmessaging.KeyValue;
-import io.openmessaging.Message.BuiltinKeys;
 import io.openmessaging.OMS;
+import io.openmessaging.message.Header;
 import io.openmessaging.producer.SendResult;
 import io.openmessaging.rocketmq.domain.BytesMessageImpl;
 import io.openmessaging.rocketmq.domain.RocketMQConstants;
 import io.openmessaging.rocketmq.domain.SendResultImpl;
 import java.lang.reflect.Field;
-import java.util.Iterator;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Set;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.UtilAll;
@@ -44,68 +41,55 @@ public class OMSUtil {
         return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
     }
 
-    public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
+    public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessageImpl omsMessage) {
         org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
-        rmqMessage.setBody(omsMessage.getBody(byte[].class));
+        rmqMessage.setBody(omsMessage.getData());
 
-        KeyValue sysHeaders = omsMessage.sysHeaders();
-        KeyValue userHeaders = omsMessage.userHeaders();
+        Header sysHeaders = omsMessage.header();
+        KeyValue userHeaders = omsMessage.properties();
 
         //All destinations in RocketMQ use Topic
-        rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION));
+        rmqMessage.setTopic(sysHeaders.getDestination());
 
-        if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) {
-            long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0);
-            if (deliverTime > 0) {
-                rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
-            }
+        long deliverTime = sysHeaders.getBornTimestamp();
+        if (deliverTime > 0) {
+            rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
         }
 
+
         for (String key : userHeaders.keySet()) {
             MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
         }
 
-        //System headers has a high priority
-        for (String key : sysHeaders.keySet()) {
-            MessageAccessor.putProperty(rmqMessage, key, sysHeaders.getString(key));
-        }
-
+        MessageAccessor.putProperty(rmqMessage, RocketMQConstants.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(sysHeaders.getDeliveryCount()));
         return rmqMessage;
     }
 
-    public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
-        BytesMessage omsMsg = new BytesMessageImpl();
-        omsMsg.setBody(rmqMsg.getBody());
-
-        KeyValue headers = omsMsg.sysHeaders();
-        KeyValue properties = omsMsg.userHeaders();
+    public static BytesMessageImpl msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
+        BytesMessageImpl omsMsg = new BytesMessageImpl();
+        omsMsg.setData(rmqMsg.getBody());
 
         final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
 
         for (final Map.Entry<String, String> entry : entries) {
-            if (isOMSHeader(entry.getKey())) {
-                headers.put(entry.getKey(), entry.getValue());
-            } else {
-                properties.put(entry.getKey(), entry.getValue());
+            if (!isOMSHeader(entry.getKey())) {
+                omsMsg.properties().put(entry.getKey(), entry.getValue());
             }
         }
 
-        omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId());
-
-        omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic());
+        omsMsg.header().setMessageId(rmqMsg.getMsgId());
+        omsMsg.header().setDestination(rmqMsg.getTopic());
+        omsMsg.header().setBornHost(String.valueOf(rmqMsg.getBornHost()));
+        omsMsg.header().setBornTimestamp(rmqMsg.getBornTimestamp());
+        omsMsg.header().setDeliveryCount(rmqMsg.getDelayTimeLevel());
 
-        omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
-        omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
-        omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
-        omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
-        omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
         return omsMsg;
     }
 
     public static boolean isOMSHeader(String value) {
-        for (Field field : BuiltinKeys.class.getDeclaredFields()) {
+        for (Field field : Header.class.getDeclaredFields()) {
             try {
-                if (field.get(BuiltinKeys.class).equals(value)) {
+                if (field.get(Header.class).equals(value)) {
                     return true;
                 }
             } catch (IllegalAccessException e) {
@@ -132,49 +116,4 @@ public class OMSUtil {
         }
         return keyValue;
     }
-
-    /**
-     * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
-     */
-    public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
-        return new Iterator<T>() {
-            Iterator<T> iterator = new Iterator<T>() {
-                @Override
-                public synchronized boolean hasNext() {
-                    return false;
-                }
-
-                @Override
-                public synchronized T next() {
-                    throw new NoSuchElementException();
-                }
-
-                @Override
-                public synchronized void remove() {
-                    //Ignore
-                }
-            };
-
-            @Override
-            public synchronized boolean hasNext() {
-                return iterator.hasNext() || iterable.iterator().hasNext();
-            }
-
-            @Override
-            public synchronized T next() {
-                if (!iterator.hasNext()) {
-                    iterator = iterable.iterator();
-                    if (!iterator.hasNext()) {
-                        throw new NoSuchElementException();
-                    }
-                }
-                return iterator.next();
-            }
-
-            @Override
-            public synchronized void remove() {
-                iterator.remove();
-            }
-        };
-    }
 }
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
index fc6515e..61178c9 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -56,9 +56,7 @@ public class ProducerImplTest {
         Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
         field.setAccessible(true);
         field.set(producer, rocketmqProducer);
-
-        messagingAccessPoint.startup();
-        producer.startup();
+        producer.start();
     }
 
     @Test
@@ -68,7 +66,7 @@ public class ProducerImplTest {
         sendResult.setSendStatus(SendStatus.SEND_OK);
         when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
         io.openmessaging.producer.SendResult omsResult =
-            producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
+            producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
 
         assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
     }
@@ -80,7 +78,7 @@ public class ProducerImplTest {
 
         when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
         try {
-            producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
+            producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
             failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
         } catch (Exception e) {
             assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
@@ -91,7 +89,7 @@ public class ProducerImplTest {
     public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
         when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
         try {
-            producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
+            producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
             failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
         } catch (Exception e) {
             assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
index f226ede..0f6414f 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
@@ -87,7 +87,7 @@ public class DefaultPromiseTest {
 
     @Test
     public void testAddListener_WithException_ListenerAfterSet() throws Exception {
-        final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        final Throwable exception = new OMSRuntimeException(-1, "Test Error");
         promise.setFailure(exception);
         promise.addListener(new FutureListener<String>() {
             @Override
@@ -99,7 +99,7 @@ public class DefaultPromiseTest {
 
     @Test
     public void testAddListener_WithException() throws Exception {
-        final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        final Throwable exception = new OMSRuntimeException(-1, "Test Error");
         promise.addListener(new FutureListener<String>() {
             @Override
             public void operationComplete(Future<String> future) {
@@ -112,7 +112,7 @@ public class DefaultPromiseTest {
     @Test
     public void getThrowable() throws Exception {
         assertThat(promise.getThrowable()).isNull();
-        Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        Throwable exception = new OMSRuntimeException(-1, "Test Error");
         promise.setFailure(exception);
         assertThat(promise.getThrowable()).isEqualTo(exception);
     }