You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/11/08 16:57:52 UTC

[rocketmq-cloudevents] branch master updated (e7549a7 -> 3418201)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-cloudevents.git.


    from e7549a7  open issue and wiki and add gitignore
     new a35ab25  Add rocketmq and cloudevents binding
     new 0072f59  [ISSUE #810]CloudEvents binding polish
     new 3418201  Merge branch 'master' of github.com:apache/rocketmq-cloudevents

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            | 177 ++++++++++++++
 rocketmq-transport-binding.md                      | 234 ++++++++++++++++++
 .../cloudevent/RocketMQMessageFactory.java         |  71 ++++++
 .../impl/RocketMQBinaryMessageReader.java          |  63 +++++
 .../rocketmq/cloudevent/impl/RocketMQHeaders.java  |  35 +++
 .../cloudevent/impl/RocketMQMessageWriter.java     |  99 ++++++++
 .../cloudevent/RocketMQMessageWriterTest.java      | 270 +++++++++++++++++++++
 .../cloudevent/RocketmqMessageFactoryTest.java     | 237 ++++++++++++++++++
 8 files changed, 1186 insertions(+)
 create mode 100644 pom.xml
 create mode 100644 rocketmq-transport-binding.md
 create mode 100644 src/main/java/org/apache/rocketmq/cloudevent/RocketMQMessageFactory.java
 create mode 100644 src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
 create mode 100644 src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQHeaders.java
 create mode 100644 src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
 create mode 100644 src/test/java/org/apache/rocketmq/cloudevent/RocketMQMessageWriterTest.java
 create mode 100644 src/test/java/org/apache/rocketmq/cloudevent/RocketmqMessageFactoryTest.java

[rocketmq-cloudevents] 01/03: Add rocketmq and cloudevents binding

Posted by du...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-cloudevents.git

commit a35ab252adc1c5a4c457117fa37b8396152922f0
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Mar 29 13:31:15 2019 +0800

    Add rocketmq and cloudevents binding
---
 rocketmq-transport-binding.md | 234 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 234 insertions(+)

diff --git a/rocketmq-transport-binding.md b/rocketmq-transport-binding.md
new file mode 100644
index 0000000..a9d3df6
--- /dev/null
+++ b/rocketmq-transport-binding.md
@@ -0,0 +1,234 @@
+# RocketMQ Transport Binding for CloudEvents
+
+## Abstract
+
+The [RocketMQ][RocketMQ] Transport Binding for CloudEvents defines how events are mapped 
+to RocketMQ messages.
+
+## Status of this document
+
+This document is a working draft.
+
+## Table of Contents
+1. [Introduction](#1-introduction)
+- 1.1. [Conformance](#11-conformance)
+- 1.2. [Relation to RocketMQ](#12-relation-to-rocketmq)
+- 1.3. [Content Modes](#13-content-modes)
+- 1.4. [Event Formats](#14-event-formats))
+- 1.5. [Security](#15-security)
+2. [Use of CloudEvents Attributes](#2-use-of-cloudevents-attributes)
+- 2.1. [contenttype Attribute](#21-contenttype-attribute)
+- 2.2. [data Attribute](#22-data-attribute)
+3. [RocketMQ Message Mapping](#3-rocketmq-message-mapping)
+- 3.1. [Binary Content Mode](#31-binary-content-mode)
+- 3.2. [Structured Content Mode](#32-structured-content-mode)
+4. [References](#4-references)
+
+## 1. Introduction
+[CloudEvents][CE] is a standardized and transport-neutral definition of the 
+structure and metadata description of events. This specification defines how 
+the elements defined in the CloudEvents specification are to be used in the 
+RocketMQ Message protocol as client produced and consumed messages.
+
+### 1.1. Conformance
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD", 
+"SHOULD NOT", "RECOMMENDED", "MAY", and "OPTIONAL" in this document are to be 
+interpreted as described in RFC2119.
+
+### 1.2. Relation to RocketMQ
+
+This specification does not prescribe rules constraining transfer or settlement 
+of event messages with RocketMQ; it solely defines how CloudEvents are expressed 
+in the RocketMQ message transport protocol as client messages that are produced and consumed.
+
+### 1.3. Content Modes
+
+The specification defines two content modes for transferring events:
+*structured* and *binary*.
+
+The RocketMQ protocol have already supported custom message headers, 
+necessary for *binary* mode.
+
+Event metadata attributes and event data are placed into the RocketMQ message
+payload using an [event format](#14-event-formats).
+
+### 1.4. Event Formats
+
+Event formats, used with the *stuctured* content mode, define how an event is
+expressed in a particular data format. All implementations of this
+specification MUST support the [JSON event format][JSON-format].
+
+### 1.5. Security
+This specification does not introduce any new security features for RocketMQ, 
+or mandate specific existing features to be used.
+
+## 2. Use of CloudEvents Attributes
+
+This specification does not further define any of the [CloudEvents][CE] event
+attributes.
+
+### 2.1. contenttype Attribute
+
+The `contenttype` attribute is assumed to contain a media-type expression
+compliant with [RFC2046][RFC2046].
+
+### 2.2. data Attribute
+The `data` attribute is assumed to contain opaque application data that is 
+encoded as declared by the `contenttype` attribute.
+
+An application is free to hold the information in any in-memory representation 
+of its choosing, but as the value is transposed into RocketMQ as defined in this 
+specification, core RocketMQ provides data available as a sequence of bytes.
+
+For instance, if the declared `contenttype` is
+`application/json;charset=utf-8`, the expectation is that the `data` attribute
+value is made available as [UTF-8][RFC3629] encoded JSON text.
+
+## 3. RocketMQ Message Mapping
+The receiver of the event can distinguish between the two content modes by inspecting 
+the `CE_contentType` property of the RocketMQ message. If the value is prefixed with the 
+CloudEvents media type `application/cloudevents`, indicating the use of a known event format, 
+the receiver uses structured mode, otherwise it defaults to binary mode.
+
+If a receiver finds a CloudEvents media type as per the above rule, but with an event format 
+that it cannot handle, for instance `application/cloudevents+avro`, it MAY still treat the event 
+as binary and forward it to another party as-is .
+
+### 3.1. Binary Content Mode
+
+The [binary content mode](#31-binary-content-mode) accommodates any shape of event data, 
+and allows for efficient transfer and without transcoding effort.
+
+#### 3.1.1. Content Type
+
+For the binary mode, the header `CE_contenttype property` MUST be mapped directly to the CloudEvents 
+contentType attribute.
+
+#### 3.1.2. Event Data Encoding
+
+The data attribute byte-sequence MUST be used as the value of the RocketMQ message.
+
+#### 3.1.3. Metadata Headers
+
+All CloudEvents attributes and CloudEvent Attributes Extensions with exception of 
+data MUST be individually mapped to and from the Header fields in the RocketMQ message.
+
+##### 3.1.3.1 Property Names
+
+[CloudEvents][CE] attributes are prefixed with `"CE_"` for use in the message section.
+
+Examples:
+
+    * `time` maps to `CE_time`
+    * `id` maps to `CE_id`
+    * `specversion` maps to `CE_specversion`
+
+##### 3.1.3.2 Property Values
+
+The value for each RocketMQ Message header is constructed from the respective header's RocketMQ 
+representation, compliant with the RocketMQ message format specification.
+
+#### 3.1.4 Example
+
+This example shows the binary mode mapping of an event into the RocketMQ message. 
+All other CloudEvents attributes are mapped to RocketMQ message property fields with prefix `CE_`.
+
+Mind that `CE_` here does refer to the event data content carried in the payload.
+
+``` text
+------------------ Message -------------------
+
+Topic: mytopic
+
+
+-------------- user properties ---------------
+
+CE_contenttype: application/avro
+CE_specversion: "0.1"
+CE_type: "com.example.someevent"
+CE_time: "2018-11-23T03:56:24Z"
+CE_id: "1234-1234-1234"
+CE_source: "/mycontext/subcontext"
+       .... further attributes ...
+
+------------------- value --------------------
+
+            ... application data ...
+
+-----------------------------------------------
+```
+
+### 3.2. Structured Content Mode
+
+The [structured content mode](#32-structured-content-mode) keeps event metadata and 
+data together in the payload, allowing simple forwarding of the same event across 
+multiple routing hops, and across multiple transports.
+
+#### 3.2.1. RocketMQ Content-Type
+
+The [RocketMQ][RocketMQ] `CE_contenttype` property field MUST be set to the media type 
+of an event format.
+
+Example for the JSON format:
+
+```
+CE_contenttype: application/cloudevents+json; charset=UTF-8
+
+```
+
+#### 3.2.2. Event Data Encoding
+The chosen event format defines how all attributes, including the payload, are represented. 
+And in RocketMQ Message Header, it describes what is the type of transport event.
+
+The event metadata and data MAY then be rendered in accordance with the event format 
+specification and the resulting data becomes the payload.
+
+#### 3.2.3. Metadata Headers
+
+Implementations MAY include the same RocketMQ headers as defined for the binary mode.
+
+#### 3.2.4. Example
+This example shows a JSON event format encoded structured data event:
+
+``` text
+------------------ Message ---------------------------
+
+Topic: mytopic
+
+------------------ user properties -------------------
+
+CE_contenttype: application/cloudevents+json; charset=UTF-8
+
+------------------ value -----------------------------
+
+{
+    "cloudEventsVersion" : "0.1",
+    "eventType" : "com.example.someevent",
+
+    ... further attributes omitted ...
+
+    "data" : {
+        ... application data ...
+    }
+}
+
+------------------------------------------------------
+```
+
+## 4. References
+
+- [RocketMQ][RocketMQ] The RocketMQ Messaging System
+- [RFC2046][RFC2046] Multipurpose Internet Mail Extensions (MIME) Part Two: Media Types
+- [RFC2119][RFC2119] Key words for use in RFCs to Indicate Requirement Levels
+- [RFC3629][RFC3629] UTF-8, a transformation format of ISO 10646
+- [RFC7159][RFC7159] The JavaScript Object Notation (JSON) Data Interchange Format
+
+[CE]: ./spec.md
+[JSON-format]: ./json-format.md
+[RocketMQ]: http://rocketmq.apache.org/
+[JSON-Value]: https://tools.ietf.org/html/rfc7159#section-3
+[RFC2046]: https://tools.ietf.org/html/rfc2046
+[RFC2119]: https://tools.ietf.org/html/rfc2119
+[RFC3629]: https://tools.ietf.org/html/rfc3629
+[RFC7159]: https://tools.ietf.org/html/rfc7159

[rocketmq-cloudevents] 03/03: Merge branch 'master' of github.com:apache/rocketmq-cloudevents

Posted by du...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-cloudevents.git

commit 34182019d231bb127cc5ec54b17733cbb38e9fd9
Merge: 0072f59 e7549a7
Author: duhenglucky <du...@apache.org>
AuthorDate: Tue Nov 9 00:57:15 2021 +0800

    Merge branch 'master' of github.com:apache/rocketmq-cloudevents
    
    * 'master' of github.com:apache/rocketmq-cloudevents:
      open issue and wiki and add gitignore
      Update README.md
      Initialize project.

 .asf.yaml  |  8 ++++++++
 .gitignore | 15 +++++++++++++++
 README.md  | 30 ++++++++++++++++++++++++++++++
 3 files changed, 53 insertions(+)

[rocketmq-cloudevents] 02/03: [ISSUE #810]CloudEvents binding polish

Posted by du...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-cloudevents.git

commit 0072f59ff80e9caa6f83db2cbf444f8a1615221e
Author: wangshaojie4039 <15...@163.com>
AuthorDate: Wed Sep 8 12:44:42 2021 +0800

    [ISSUE #810]CloudEvents binding polish
    
    Co-authored-by: wangshaojie <wa...@cmss.chinamobile.com>
---
 pom.xml                                            | 177 ++++++++++++++
 .../cloudevent/RocketMQMessageFactory.java         |  71 ++++++
 .../impl/RocketMQBinaryMessageReader.java          |  63 +++++
 .../rocketmq/cloudevent/impl/RocketMQHeaders.java  |  35 +++
 .../cloudevent/impl/RocketMQMessageWriter.java     |  99 ++++++++
 .../cloudevent/RocketMQMessageWriterTest.java      | 270 +++++++++++++++++++++
 .../cloudevent/RocketmqMessageFactoryTest.java     | 237 ++++++++++++++++++
 7 files changed, 952 insertions(+)

diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..5236365
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+	license agreements. See the NOTICE file distributed with this work for additional
+	information regarding copyright ownership. The ASF licenses this file to
+	You under the Apache License, Version 2.0 (the "License"); you may not use
+	this file except in compliance with the License. You may obtain a copy of
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+	by applicable law or agreed to in writing, software distributed under the
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+	OF ANY KIND, either express or implied. See the License for the specific
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>org.apache.rocketmq</groupId>
+	<artifactId>rocketmq-cloudevents-binding</artifactId>
+	<version>1.0.0</version>
+
+	<name>rocketmq-cloudevents-binding</name>
+	<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-cloudevents-binding</url>
+
+	<licenses>
+		<license>
+			<name>The Apache Software License, Version 2.0</name>
+			<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+		</license>
+	</licenses>
+
+	<issueManagement>
+		<system>jira</system>
+		<url>https://issues.apache.org/jira/browse/RocketMQ</url>
+	</issueManagement>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+		<!-- Compiler settings properties -->
+		<maven.compiler.source>1.8</maven.compiler.source>
+		<maven.compiler.target>1.8</maven.compiler.target>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>versions-maven-plugin</artifactId>
+				<version>2.3</version>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>clirr-maven-plugin</artifactId>
+				<version>2.7</version>
+			</plugin>
+			<plugin>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.6.1</version>
+				<configuration>
+					<source>${maven.compiler.source}</source>
+					<target>${maven.compiler.target}</target>
+					<compilerVersion>${maven.compiler.source}</compilerVersion>
+					<showDeprecation>true</showDeprecation>
+					<showWarnings>true</showWarnings>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.19.1</version>
+				<configuration>
+					<argLine>-Xms512m -Xmx1024m</argLine>
+					<forkMode>always</forkMode>
+					<includes>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-site-plugin</artifactId>
+				<version>3.6</version>
+				<configuration>
+					<locales>en_US</locales>
+					<outputEncoding>UTF-8</outputEncoding>
+					<inputEncoding>UTF-8</inputEncoding>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-source-plugin</artifactId>
+				<version>3.0.1</version>
+				<executions>
+					<execution>
+						<id>attach-sources</id>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<artifactId>maven-javadoc-plugin</artifactId>
+				<version>2.10.4</version>
+				<configuration>
+					<charset>UTF-8</charset>
+					<locale>en_US</locale>
+					<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+				</configuration>
+				<executions>
+					<execution>
+						<id>aggregate</id>
+						<goals>
+							<goal>aggregate</goal>
+						</goals>
+						<phase>site</phase>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<artifactId>maven-resources-plugin</artifactId>
+				<version>3.0.2</version>
+				<configuration>
+					<encoding>${project.build.sourceEncoding}</encoding>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>findbugs-maven-plugin</artifactId>
+				<version>3.0.4</version>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-client</artifactId>
+			<version>4.9.0</version>
+		</dependency>
+		<dependency>
+			<groupId>io.cloudevents</groupId>
+			<artifactId>cloudevents-core</artifactId>
+			<version>2.2.0</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.code.findbugs</groupId>
+			<artifactId>jsr305</artifactId>
+			<version>3.0.2</version>
+			<scope>provided</scope>
+			<optional>true</optional>
+		</dependency>
+
+
+		<dependency>
+			<groupId>org.junit.jupiter</groupId>
+			<artifactId>junit-jupiter</artifactId>
+			<version>5.7.0</version>
+		</dependency>
+		<dependency>
+			<groupId>io.cloudevents</groupId>
+			<artifactId>cloudevents-core</artifactId>
+			<classifier>tests</classifier>
+			<type>test-jar</type>
+			<version>2.2.0</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.assertj</groupId>
+			<artifactId>assertj-core</artifactId>
+			<version>3.16.1</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+</project>
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/RocketMQMessageFactory.java b/src/main/java/org/apache/rocketmq/cloudevent/RocketMQMessageFactory.java
new file mode 100644
index 0000000..9d728c1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/RocketMQMessageFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent;
+
+import org.apache.rocketmq.cloudevent.impl.RocketMQBinaryMessageReader;
+import org.apache.rocketmq.cloudevent.impl.RocketMQHeaders;
+import org.apache.rocketmq.cloudevent.impl.RocketMQMessageWriter;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
+import io.cloudevents.core.message.impl.MessageUtils;
+import io.cloudevents.lang.Nullable;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import org.apache.rocketmq.common.message.Message;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import java.util.Map;
+
+
+@ParametersAreNonnullByDefault
+public final class RocketMQMessageFactory {
+
+    private RocketMQMessageFactory() {
+        // prevent instantiation
+    }
+
+    public static MessageReader createReader(final Message message) throws CloudEventRWException {
+        return createReader(message.getProperties(), message.getBody());
+    }
+
+
+    public static MessageReader createReader(final Map<String, String> props, @Nullable final byte[] body) throws CloudEventRWException {
+
+        return MessageUtils.parseStructuredOrBinaryMessage(
+                () -> props.get(RocketMQHeaders.CONTENT_TYPE),
+                format -> new GenericStructuredMessageReader(format, body),
+                () -> props.get(RocketMQHeaders.SPEC_VERSION),
+                sv -> new RocketMQBinaryMessageReader(sv, props, body)
+        );
+    }
+
+
+    public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic) {
+        return new RocketMQMessageWriter<>(topic);
+    }
+
+    public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic, String keys) {
+        return new RocketMQMessageWriter<>(topic, keys);
+    }
+
+    public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic, String keys, String tags) {
+        return new RocketMQMessageWriter<>(topic, keys, tags);
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
new file mode 100644
index 0000000..f48f9ee
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent.impl;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.data.BytesCloudEventData;
+import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+
+public class RocketMQBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, String> {
+
+    private final Map<String, String> headers;
+
+    public RocketMQBinaryMessageReader(SpecVersion version, Map<String, String> headers, byte[] payload) {
+        super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);
+
+        Objects.requireNonNull(headers);
+        this.headers = headers;
+    }
+
+    @Override
+    protected boolean isContentTypeHeader(String key) {
+        return key.equals(RocketMQHeaders.CONTENT_TYPE);
+    }
+
+    @Override
+    protected boolean isCloudEventsHeader(String key) {
+        return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length()).startsWith(RocketMQHeaders.CE_PREFIX);
+    }
+
+    @Override
+    protected String toCloudEventsKey(String key) {
+        return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase();
+    }
+
+    @Override
+    protected void forEachHeader(BiConsumer<String, String> fn) {
+        this.headers.forEach((k, v) -> fn.accept(k, v));
+    }
+
+    @Override
+    protected String toCloudEventsValue(String value) {
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQHeaders.java b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQHeaders.java
new file mode 100644
index 0000000..f292777
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQHeaders.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.cloudevent.impl;
+
+import io.cloudevents.core.message.impl.MessageUtils;
+import io.cloudevents.core.v1.CloudEventV1;
+
+import java.util.Map;
+
+public class RocketMQHeaders {
+
+    public static final String CE_PREFIX = "CE_";
+
+    protected static final Map<String, String> ATTRIBUTES_TO_HEADERS = MessageUtils.generateAttributesToHeadersMapping(v -> CE_PREFIX + v);
+
+    public static final String CONTENT_TYPE = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE);
+
+    public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION);
+
+}
+
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQMessageWriter.java b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
new file mode 100644
index 0000000..6bbfc71
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.cloudevent.impl;
+
+import io.cloudevents.CloudEventData;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.rw.CloudEventContextWriter;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import org.apache.rocketmq.common.message.Message;
+
+
+public final class RocketMQMessageWriter<R> implements MessageWriter<CloudEventWriter<Message>, Message>, CloudEventWriter<Message> {
+
+    private Message message;
+
+
+    public RocketMQMessageWriter(String topic) {
+        message = new Message();
+        message.setTopic(topic);
+    }
+
+    public RocketMQMessageWriter(String topic, String keys) {
+        message = new Message();
+
+        message.setTopic(topic);
+
+        if (keys != null && keys.length() > 0) {
+            message.setKeys(keys);
+        }
+    }
+
+    public RocketMQMessageWriter(String topic, String keys, String tags) {
+        message = new Message();
+
+        message.setTopic(topic);
+
+        if (tags != null && tags.length() > 0) {
+            message.setTags(tags);
+        }
+
+        if (keys != null && keys.length() > 0) {
+            message.setKeys(keys);
+        }
+    }
+
+
+    @Override
+    public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
+
+        String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name);
+        if (propName == null) {
+            propName = RocketMQHeaders.CE_PREFIX + name;
+        }
+        message.putUserProperty(propName, value);
+        return this;
+    }
+
+    @Override
+    public RocketMQMessageWriter<R> create(final SpecVersion version) {
+        message.putUserProperty(RocketMQHeaders.SPEC_VERSION, version.toString());
+        return this;
+    }
+
+    @Override
+    public Message setEvent(final EventFormat format, final byte[] value) throws CloudEventRWException {
+        message.putUserProperty(RocketMQHeaders.CONTENT_TYPE, format.serializedContentType());
+        message.setBody(value);
+        return message;
+    }
+
+    @Override
+    public Message end(final CloudEventData data) throws CloudEventRWException {
+        message.setBody(data.toBytes());
+        return message;
+    }
+
+    @Override
+    public Message end() {
+        message.setBody(null);
+        return message;
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/cloudevent/RocketMQMessageWriterTest.java b/src/test/java/org/apache/rocketmq/cloudevent/RocketMQMessageWriterTest.java
new file mode 100644
index 0000000..cbab95d
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/cloudevent/RocketMQMessageWriterTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.message.StructuredMessageReader;
+import io.cloudevents.core.mock.CSVFormat;
+import io.cloudevents.core.test.Data;
+import io.cloudevents.core.v03.CloudEventV03;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.types.Time;
+import org.apache.rocketmq.cloudevent.impl.RocketMQHeaders;
+import org.apache.rocketmq.common.message.Message;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RocketMQMessageWriterTest {
+
+    private static final String PREFIX_TEMPLATE = RocketMQHeaders.CE_PREFIX + "%s";
+    private static final String DATACONTENTTYPE_NULL = null;
+    private static final byte[] DATAPAYLOAD_NULL = null;
+
+
+    @ParameterizedTest
+    @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
+    void testRequestWithStructured(CloudEvent event) {
+        String expectedContentType = CSVFormat.INSTANCE.serializedContentType();
+        byte[] expectedBuffer = CSVFormat.INSTANCE.serialize(event);
+
+        String topic = "test";
+        String keys = "keys";
+        String tags = "tags";
+
+        Message message = StructuredMessageReader
+                .from(event, CSVFormat.INSTANCE)
+                .read(RocketMQMessageFactory.createWriter(topic, keys, tags));
+
+        assertThat(message.getTopic())
+                .isEqualTo(topic);
+        assertThat(message.getKeys())
+                .isEqualTo(keys);
+        assertThat(message.getTags())
+                .isEqualTo(tags);
+        assertThat(message.getBody())
+                .isEqualTo(expectedBuffer);
+    }
+
+    @ParameterizedTest
+    @MethodSource("binaryTestArguments")
+    void testRequestWithBinary(CloudEvent event, Map<String, String> expectedHeaders, byte[] expectedBody) {
+
+        String topic = "test";
+        String keys = "keys";
+        String tags = "tags";
+
+        Message message = RocketMQMessageFactory
+                .createWriter(topic, keys, tags)
+                .writeBinary(event);
+
+        assertThat(message.getTopic())
+                .isEqualTo(topic);
+        assertThat(message.getKeys())
+                .isEqualTo(keys);
+        assertThat(message.getTags())
+                .isEqualTo(tags);
+        assertThat(message.getBody())
+                .isEqualTo(expectedBody);
+        assertThat(message.getProperties()
+                .keySet().containsAll(expectedHeaders.keySet()));
+        assertThat(message.getProperties()
+                .values().containsAll(expectedHeaders.values()));
+    }
+
+    private static Stream<Arguments> binaryTestArguments() {
+
+        return Stream.of(
+                // V03
+                Arguments.of(
+                        Data.V03_MIN,
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property("ignored", "ignore")
+                        ),
+                        DATAPAYLOAD_NULL
+                ),
+                Arguments.of(
+                        Data.V03_WITH_JSON_DATA,
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignore")
+                        ),
+                        Data.DATA_JSON_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V03_WITH_JSON_DATA_WITH_EXT_STRING,
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("astring", "aaa"),
+                                property("aboolean", "true"),
+                                property("anumber", "10"),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATA_JSON_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V03_WITH_XML_DATA,
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_XML_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V03_WITH_TEXT_DATA,
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_TEXT_SERIALIZED
+
+                ),
+                // V1
+                Arguments.of(
+                        Data.V1_MIN,
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property("ignored", "ignored")
+                        ),
+
+                        DATAPAYLOAD_NULL
+
+                ),
+                Arguments.of(
+                        Data.V1_WITH_JSON_DATA,
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_JSON_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V1_WITH_JSON_DATA_WITH_EXT_STRING,
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("astring", "aaa"),
+                                property("aboolean", "true"),
+                                property("anumber", "10"),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_JSON_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V1_WITH_XML_DATA,
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_XML_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V1_WITH_TEXT_DATA,
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_TEXT_SERIALIZED
+
+                )
+        );
+    }
+
+    private static final AbstractMap.SimpleEntry<String, String> property(final String name, final String value) {
+        return name.equalsIgnoreCase("ignored") ?
+                new AbstractMap.SimpleEntry<>(name, value) :
+                new AbstractMap.SimpleEntry<>(String.format(PREFIX_TEMPLATE, name), value);
+    }
+
+    @SafeVarargs
+    private static final Map<String, String> properties(final AbstractMap.SimpleEntry<String, String>... entries) {
+        return Stream.of(entries)
+                .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/cloudevent/RocketmqMessageFactoryTest.java b/src/test/java/org/apache/rocketmq/cloudevent/RocketmqMessageFactoryTest.java
new file mode 100644
index 0000000..25fa69a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/cloudevent/RocketmqMessageFactoryTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent;
+
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.message.Encoding;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.mock.CSVFormat;
+import io.cloudevents.core.test.Data;
+import io.cloudevents.core.v03.CloudEventV03;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.types.Time;
+import org.apache.rocketmq.cloudevent.impl.RocketMQHeaders;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RocketmqMessageFactoryTest {
+
+    private static final String PREFIX_TEMPLATE = RocketMQHeaders.CE_PREFIX + "%s";
+    private static final String DATACONTENTTYPE_NULL = null;
+    private static final byte[] DATAPAYLOAD_NULL = null;
+
+    @ParameterizedTest()
+    @MethodSource("binaryTestArguments")
+    public void readBinary(final Map<String, String> props, final String contentType, final byte[] body,
+                           final CloudEvent event) {
+        props.put(RocketMQHeaders.CONTENT_TYPE, contentType);
+        final MessageReader reader = RocketMQMessageFactory.createReader(props, body);
+        assertThat(reader.getEncoding()).isEqualTo(Encoding.BINARY);
+        assertThat(reader.toEvent()).isEqualTo(event);
+    }
+
+    @ParameterizedTest()
+    @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
+    public void readStructured(final CloudEvent event) {
+        final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8";
+        final byte[] contentPayload = CSVFormat.INSTANCE.serialize(event);
+        Map<String, String> properties = new HashMap<>();
+        properties.put(RocketMQHeaders.CONTENT_TYPE, contentType);
+        final MessageReader reader = RocketMQMessageFactory.createReader(properties, contentPayload);
+        assertThat(reader.getEncoding()).isEqualTo(Encoding.STRUCTURED);
+        assertThat(reader.toEvent()).isEqualTo(event);
+    }
+
+    private static Stream<Arguments> binaryTestArguments() {
+
+        return Stream.of(
+                // V03
+                Arguments.of(
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property("ignored", "ignore")
+                        ),
+                        DATACONTENTTYPE_NULL,
+                        DATAPAYLOAD_NULL,
+                        Data.V03_MIN
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignore")
+                        ),
+                        Data.DATACONTENTTYPE_JSON,
+                        Data.DATA_JSON_SERIALIZED,
+                        Data.V03_WITH_JSON_DATA
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("astring", "aaa"),
+                                property("aboolean", "true"),
+                                property("anumber", "10"),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_JSON,
+                        Data.DATA_JSON_SERIALIZED,
+                        Data.V03_WITH_JSON_DATA_WITH_EXT_STRING
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_XML,
+                        Data.DATA_XML_SERIALIZED,
+                        Data.V03_WITH_XML_DATA
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_TEXT,
+                        Data.DATA_TEXT_SERIALIZED,
+                        Data.V03_WITH_TEXT_DATA
+                ),
+                // V1
+                Arguments.of(
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property("ignored", "ignored")
+                        ),
+                        DATACONTENTTYPE_NULL,
+                        DATAPAYLOAD_NULL,
+                        Data.V1_MIN
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_JSON,
+                        Data.DATA_JSON_SERIALIZED,
+                        Data.V1_WITH_JSON_DATA
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("astring", "aaa"),
+                                property("aboolean", "true"),
+                                property("anumber", "10"),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_JSON,
+                        Data.DATA_JSON_SERIALIZED,
+                        Data.V1_WITH_JSON_DATA_WITH_EXT_STRING
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_XML,
+                        Data.DATA_XML_SERIALIZED,
+                        Data.V1_WITH_XML_DATA
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_TEXT,
+                        Data.DATA_TEXT_SERIALIZED,
+                        Data.V1_WITH_TEXT_DATA
+                )
+        );
+    }
+
+    private static final AbstractMap.SimpleEntry<String, String> property(final String name, final String value) {
+        return name.equalsIgnoreCase("ignored") ?
+                new AbstractMap.SimpleEntry<>(name, value) :
+                new AbstractMap.SimpleEntry<>(String.format(PREFIX_TEMPLATE, name), value);
+    }
+
+    @SafeVarargs
+    private static final Map<String, String> properties(final AbstractMap.SimpleEntry<String, String>... entries) {
+        return Stream.of(entries)
+                .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+
+    }
+}