You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/11/08 16:57:54 UTC

[rocketmq-cloudevents] 02/03: [ISSUE #810]CloudEvents binding polish

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-cloudevents.git

commit 0072f59ff80e9caa6f83db2cbf444f8a1615221e
Author: wangshaojie4039 <15...@163.com>
AuthorDate: Wed Sep 8 12:44:42 2021 +0800

    [ISSUE #810]CloudEvents binding polish
    
    Co-authored-by: wangshaojie <wa...@cmss.chinamobile.com>
---
 pom.xml                                            | 177 ++++++++++++++
 .../cloudevent/RocketMQMessageFactory.java         |  71 ++++++
 .../impl/RocketMQBinaryMessageReader.java          |  63 +++++
 .../rocketmq/cloudevent/impl/RocketMQHeaders.java  |  35 +++
 .../cloudevent/impl/RocketMQMessageWriter.java     |  99 ++++++++
 .../cloudevent/RocketMQMessageWriterTest.java      | 270 +++++++++++++++++++++
 .../cloudevent/RocketmqMessageFactoryTest.java     | 237 ++++++++++++++++++
 7 files changed, 952 insertions(+)

diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..5236365
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+	license agreements. See the NOTICE file distributed with this work for additional
+	information regarding copyright ownership. The ASF licenses this file to
+	You under the Apache License, Version 2.0 (the "License"); you may not use
+	this file except in compliance with the License. You may obtain a copy of
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+	by applicable law or agreed to in writing, software distributed under the
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+	OF ANY KIND, either express or implied. See the License for the specific
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>org.apache.rocketmq</groupId>
+	<artifactId>rocketmq-cloudevents-binding</artifactId>
+	<version>1.0.0</version>
+
+	<name>rocketmq-cloudevents-binding</name>
+	<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-cloudevents-binding</url>
+
+	<licenses>
+		<license>
+			<name>The Apache Software License, Version 2.0</name>
+			<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+		</license>
+	</licenses>
+
+	<issueManagement>
+		<system>jira</system>
+		<url>https://issues.apache.org/jira/browse/RocketMQ</url>
+	</issueManagement>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+		<!-- Compiler settings properties -->
+		<maven.compiler.source>1.8</maven.compiler.source>
+		<maven.compiler.target>1.8</maven.compiler.target>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>versions-maven-plugin</artifactId>
+				<version>2.3</version>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>clirr-maven-plugin</artifactId>
+				<version>2.7</version>
+			</plugin>
+			<plugin>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.6.1</version>
+				<configuration>
+					<source>${maven.compiler.source}</source>
+					<target>${maven.compiler.target}</target>
+					<compilerVersion>${maven.compiler.source}</compilerVersion>
+					<showDeprecation>true</showDeprecation>
+					<showWarnings>true</showWarnings>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.19.1</version>
+				<configuration>
+					<argLine>-Xms512m -Xmx1024m</argLine>
+					<forkMode>always</forkMode>
+					<includes>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-site-plugin</artifactId>
+				<version>3.6</version>
+				<configuration>
+					<locales>en_US</locales>
+					<outputEncoding>UTF-8</outputEncoding>
+					<inputEncoding>UTF-8</inputEncoding>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-source-plugin</artifactId>
+				<version>3.0.1</version>
+				<executions>
+					<execution>
+						<id>attach-sources</id>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<artifactId>maven-javadoc-plugin</artifactId>
+				<version>2.10.4</version>
+				<configuration>
+					<charset>UTF-8</charset>
+					<locale>en_US</locale>
+					<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+				</configuration>
+				<executions>
+					<execution>
+						<id>aggregate</id>
+						<goals>
+							<goal>aggregate</goal>
+						</goals>
+						<phase>site</phase>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<artifactId>maven-resources-plugin</artifactId>
+				<version>3.0.2</version>
+				<configuration>
+					<encoding>${project.build.sourceEncoding}</encoding>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>findbugs-maven-plugin</artifactId>
+				<version>3.0.4</version>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-client</artifactId>
+			<version>4.9.0</version>
+		</dependency>
+		<dependency>
+			<groupId>io.cloudevents</groupId>
+			<artifactId>cloudevents-core</artifactId>
+			<version>2.2.0</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.code.findbugs</groupId>
+			<artifactId>jsr305</artifactId>
+			<version>3.0.2</version>
+			<scope>provided</scope>
+			<optional>true</optional>
+		</dependency>
+
+
+		<dependency>
+			<groupId>org.junit.jupiter</groupId>
+			<artifactId>junit-jupiter</artifactId>
+			<version>5.7.0</version>
+		</dependency>
+		<dependency>
+			<groupId>io.cloudevents</groupId>
+			<artifactId>cloudevents-core</artifactId>
+			<classifier>tests</classifier>
+			<type>test-jar</type>
+			<version>2.2.0</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.assertj</groupId>
+			<artifactId>assertj-core</artifactId>
+			<version>3.16.1</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+</project>
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/RocketMQMessageFactory.java b/src/main/java/org/apache/rocketmq/cloudevent/RocketMQMessageFactory.java
new file mode 100644
index 0000000..9d728c1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/RocketMQMessageFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent;
+
+import org.apache.rocketmq.cloudevent.impl.RocketMQBinaryMessageReader;
+import org.apache.rocketmq.cloudevent.impl.RocketMQHeaders;
+import org.apache.rocketmq.cloudevent.impl.RocketMQMessageWriter;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
+import io.cloudevents.core.message.impl.MessageUtils;
+import io.cloudevents.lang.Nullable;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import org.apache.rocketmq.common.message.Message;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import java.util.Map;
+
+
+@ParametersAreNonnullByDefault
+public final class RocketMQMessageFactory {
+
+    private RocketMQMessageFactory() {
+        // prevent instantiation
+    }
+
+    public static MessageReader createReader(final Message message) throws CloudEventRWException {
+        return createReader(message.getProperties(), message.getBody());
+    }
+
+
+    public static MessageReader createReader(final Map<String, String> props, @Nullable final byte[] body) throws CloudEventRWException {
+
+        return MessageUtils.parseStructuredOrBinaryMessage(
+                () -> props.get(RocketMQHeaders.CONTENT_TYPE),
+                format -> new GenericStructuredMessageReader(format, body),
+                () -> props.get(RocketMQHeaders.SPEC_VERSION),
+                sv -> new RocketMQBinaryMessageReader(sv, props, body)
+        );
+    }
+
+
+    public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic) {
+        return new RocketMQMessageWriter<>(topic);
+    }
+
+    public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic, String keys) {
+        return new RocketMQMessageWriter<>(topic, keys);
+    }
+
+    public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic, String keys, String tags) {
+        return new RocketMQMessageWriter<>(topic, keys, tags);
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
new file mode 100644
index 0000000..f48f9ee
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent.impl;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.data.BytesCloudEventData;
+import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+
+public class RocketMQBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, String> {
+
+    private final Map<String, String> headers;
+
+    public RocketMQBinaryMessageReader(SpecVersion version, Map<String, String> headers, byte[] payload) {
+        super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);
+
+        Objects.requireNonNull(headers);
+        this.headers = headers;
+    }
+
+    @Override
+    protected boolean isContentTypeHeader(String key) {
+        return key.equals(RocketMQHeaders.CONTENT_TYPE);
+    }
+
+    @Override
+    protected boolean isCloudEventsHeader(String key) {
+        return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length()).startsWith(RocketMQHeaders.CE_PREFIX);
+    }
+
+    @Override
+    protected String toCloudEventsKey(String key) {
+        return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase();
+    }
+
+    @Override
+    protected void forEachHeader(BiConsumer<String, String> fn) {
+        this.headers.forEach((k, v) -> fn.accept(k, v));
+    }
+
+    @Override
+    protected String toCloudEventsValue(String value) {
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQHeaders.java b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQHeaders.java
new file mode 100644
index 0000000..f292777
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQHeaders.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.cloudevent.impl;
+
+import io.cloudevents.core.message.impl.MessageUtils;
+import io.cloudevents.core.v1.CloudEventV1;
+
+import java.util.Map;
+
+public class RocketMQHeaders {
+
+    public static final String CE_PREFIX = "CE_";
+
+    protected static final Map<String, String> ATTRIBUTES_TO_HEADERS = MessageUtils.generateAttributesToHeadersMapping(v -> CE_PREFIX + v);
+
+    public static final String CONTENT_TYPE = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE);
+
+    public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION);
+
+}
+
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQMessageWriter.java b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
new file mode 100644
index 0000000..6bbfc71
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.cloudevent.impl;
+
+import io.cloudevents.CloudEventData;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.rw.CloudEventContextWriter;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import org.apache.rocketmq.common.message.Message;
+
+
+public final class RocketMQMessageWriter<R> implements MessageWriter<CloudEventWriter<Message>, Message>, CloudEventWriter<Message> {
+
+    private Message message;
+
+
+    public RocketMQMessageWriter(String topic) {
+        message = new Message();
+        message.setTopic(topic);
+    }
+
+    public RocketMQMessageWriter(String topic, String keys) {
+        message = new Message();
+
+        message.setTopic(topic);
+
+        if (keys != null && keys.length() > 0) {
+            message.setKeys(keys);
+        }
+    }
+
+    public RocketMQMessageWriter(String topic, String keys, String tags) {
+        message = new Message();
+
+        message.setTopic(topic);
+
+        if (tags != null && tags.length() > 0) {
+            message.setTags(tags);
+        }
+
+        if (keys != null && keys.length() > 0) {
+            message.setKeys(keys);
+        }
+    }
+
+
+    @Override
+    public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
+
+        String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name);
+        if (propName == null) {
+            propName = RocketMQHeaders.CE_PREFIX + name;
+        }
+        message.putUserProperty(propName, value);
+        return this;
+    }
+
+    @Override
+    public RocketMQMessageWriter<R> create(final SpecVersion version) {
+        message.putUserProperty(RocketMQHeaders.SPEC_VERSION, version.toString());
+        return this;
+    }
+
+    @Override
+    public Message setEvent(final EventFormat format, final byte[] value) throws CloudEventRWException {
+        message.putUserProperty(RocketMQHeaders.CONTENT_TYPE, format.serializedContentType());
+        message.setBody(value);
+        return message;
+    }
+
+    @Override
+    public Message end(final CloudEventData data) throws CloudEventRWException {
+        message.setBody(data.toBytes());
+        return message;
+    }
+
+    @Override
+    public Message end() {
+        message.setBody(null);
+        return message;
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/cloudevent/RocketMQMessageWriterTest.java b/src/test/java/org/apache/rocketmq/cloudevent/RocketMQMessageWriterTest.java
new file mode 100644
index 0000000..cbab95d
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/cloudevent/RocketMQMessageWriterTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.message.StructuredMessageReader;
+import io.cloudevents.core.mock.CSVFormat;
+import io.cloudevents.core.test.Data;
+import io.cloudevents.core.v03.CloudEventV03;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.types.Time;
+import org.apache.rocketmq.cloudevent.impl.RocketMQHeaders;
+import org.apache.rocketmq.common.message.Message;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RocketMQMessageWriterTest {
+
+    private static final String PREFIX_TEMPLATE = RocketMQHeaders.CE_PREFIX + "%s";
+    private static final String DATACONTENTTYPE_NULL = null;
+    private static final byte[] DATAPAYLOAD_NULL = null;
+
+
+    @ParameterizedTest
+    @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
+    void testRequestWithStructured(CloudEvent event) {
+        String expectedContentType = CSVFormat.INSTANCE.serializedContentType();
+        byte[] expectedBuffer = CSVFormat.INSTANCE.serialize(event);
+
+        String topic = "test";
+        String keys = "keys";
+        String tags = "tags";
+
+        Message message = StructuredMessageReader
+                .from(event, CSVFormat.INSTANCE)
+                .read(RocketMQMessageFactory.createWriter(topic, keys, tags));
+
+        assertThat(message.getTopic())
+                .isEqualTo(topic);
+        assertThat(message.getKeys())
+                .isEqualTo(keys);
+        assertThat(message.getTags())
+                .isEqualTo(tags);
+        assertThat(message.getBody())
+                .isEqualTo(expectedBuffer);
+    }
+
+    @ParameterizedTest
+    @MethodSource("binaryTestArguments")
+    void testRequestWithBinary(CloudEvent event, Map<String, String> expectedHeaders, byte[] expectedBody) {
+
+        String topic = "test";
+        String keys = "keys";
+        String tags = "tags";
+
+        Message message = RocketMQMessageFactory
+                .createWriter(topic, keys, tags)
+                .writeBinary(event);
+
+        assertThat(message.getTopic())
+                .isEqualTo(topic);
+        assertThat(message.getKeys())
+                .isEqualTo(keys);
+        assertThat(message.getTags())
+                .isEqualTo(tags);
+        assertThat(message.getBody())
+                .isEqualTo(expectedBody);
+        assertThat(message.getProperties()
+                .keySet().containsAll(expectedHeaders.keySet()));
+        assertThat(message.getProperties()
+                .values().containsAll(expectedHeaders.values()));
+    }
+
+    private static Stream<Arguments> binaryTestArguments() {
+
+        return Stream.of(
+                // V03
+                Arguments.of(
+                        Data.V03_MIN,
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property("ignored", "ignore")
+                        ),
+                        DATAPAYLOAD_NULL
+                ),
+                Arguments.of(
+                        Data.V03_WITH_JSON_DATA,
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignore")
+                        ),
+                        Data.DATA_JSON_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V03_WITH_JSON_DATA_WITH_EXT_STRING,
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("astring", "aaa"),
+                                property("aboolean", "true"),
+                                property("anumber", "10"),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATA_JSON_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V03_WITH_XML_DATA,
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_XML_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V03_WITH_TEXT_DATA,
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_TEXT_SERIALIZED
+
+                ),
+                // V1
+                Arguments.of(
+                        Data.V1_MIN,
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property("ignored", "ignored")
+                        ),
+
+                        DATAPAYLOAD_NULL
+
+                ),
+                Arguments.of(
+                        Data.V1_WITH_JSON_DATA,
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_JSON_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V1_WITH_JSON_DATA_WITH_EXT_STRING,
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("astring", "aaa"),
+                                property("aboolean", "true"),
+                                property("anumber", "10"),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_JSON_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V1_WITH_XML_DATA,
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_XML_SERIALIZED
+
+                ),
+                Arguments.of(
+                        Data.V1_WITH_TEXT_DATA,
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+
+                        Data.DATA_TEXT_SERIALIZED
+
+                )
+        );
+    }
+
+    private static final AbstractMap.SimpleEntry<String, String> property(final String name, final String value) {
+        return name.equalsIgnoreCase("ignored") ?
+                new AbstractMap.SimpleEntry<>(name, value) :
+                new AbstractMap.SimpleEntry<>(String.format(PREFIX_TEMPLATE, name), value);
+    }
+
+    @SafeVarargs
+    private static final Map<String, String> properties(final AbstractMap.SimpleEntry<String, String>... entries) {
+        return Stream.of(entries)
+                .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/cloudevent/RocketmqMessageFactoryTest.java b/src/test/java/org/apache/rocketmq/cloudevent/RocketmqMessageFactoryTest.java
new file mode 100644
index 0000000..25fa69a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/cloudevent/RocketmqMessageFactoryTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent;
+
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.message.Encoding;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.mock.CSVFormat;
+import io.cloudevents.core.test.Data;
+import io.cloudevents.core.v03.CloudEventV03;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.types.Time;
+import org.apache.rocketmq.cloudevent.impl.RocketMQHeaders;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RocketmqMessageFactoryTest {
+
+    private static final String PREFIX_TEMPLATE = RocketMQHeaders.CE_PREFIX + "%s";
+    private static final String DATACONTENTTYPE_NULL = null;
+    private static final byte[] DATAPAYLOAD_NULL = null;
+
+    @ParameterizedTest()
+    @MethodSource("binaryTestArguments")
+    public void readBinary(final Map<String, String> props, final String contentType, final byte[] body,
+                           final CloudEvent event) {
+        props.put(RocketMQHeaders.CONTENT_TYPE, contentType);
+        final MessageReader reader = RocketMQMessageFactory.createReader(props, body);
+        assertThat(reader.getEncoding()).isEqualTo(Encoding.BINARY);
+        assertThat(reader.toEvent()).isEqualTo(event);
+    }
+
+    @ParameterizedTest()
+    @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
+    public void readStructured(final CloudEvent event) {
+        final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8";
+        final byte[] contentPayload = CSVFormat.INSTANCE.serialize(event);
+        Map<String, String> properties = new HashMap<>();
+        properties.put(RocketMQHeaders.CONTENT_TYPE, contentType);
+        final MessageReader reader = RocketMQMessageFactory.createReader(properties, contentPayload);
+        assertThat(reader.getEncoding()).isEqualTo(Encoding.STRUCTURED);
+        assertThat(reader.toEvent()).isEqualTo(event);
+    }
+
+    private static Stream<Arguments> binaryTestArguments() {
+
+        return Stream.of(
+                // V03
+                Arguments.of(
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property("ignored", "ignore")
+                        ),
+                        DATACONTENTTYPE_NULL,
+                        DATAPAYLOAD_NULL,
+                        Data.V03_MIN
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignore")
+                        ),
+                        Data.DATACONTENTTYPE_JSON,
+                        Data.DATA_JSON_SERIALIZED,
+                        Data.V03_WITH_JSON_DATA
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("astring", "aaa"),
+                                property("aboolean", "true"),
+                                property("anumber", "10"),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_JSON,
+                        Data.DATA_JSON_SERIALIZED,
+                        Data.V03_WITH_JSON_DATA_WITH_EXT_STRING
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_XML,
+                        Data.DATA_XML_SERIALIZED,
+                        Data.V03_WITH_XML_DATA
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+                                property(CloudEventV03.ID, Data.ID),
+                                property(CloudEventV03.TYPE, Data.TYPE),
+                                property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV03.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_TEXT,
+                        Data.DATA_TEXT_SERIALIZED,
+                        Data.V03_WITH_TEXT_DATA
+                ),
+                // V1
+                Arguments.of(
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property("ignored", "ignored")
+                        ),
+                        DATACONTENTTYPE_NULL,
+                        DATAPAYLOAD_NULL,
+                        Data.V1_MIN
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_JSON,
+                        Data.DATA_JSON_SERIALIZED,
+                        Data.V1_WITH_JSON_DATA
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("astring", "aaa"),
+                                property("aboolean", "true"),
+                                property("anumber", "10"),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_JSON,
+                        Data.DATA_JSON_SERIALIZED,
+                        Data.V1_WITH_JSON_DATA_WITH_EXT_STRING
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_XML,
+                        Data.DATA_XML_SERIALIZED,
+                        Data.V1_WITH_XML_DATA
+                ),
+                Arguments.of(
+                        properties(
+                                property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+                                property(CloudEventV1.ID, Data.ID),
+                                property(CloudEventV1.TYPE, Data.TYPE),
+                                property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+                                property(CloudEventV1.SUBJECT, Data.SUBJECT),
+                                property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+                                property("ignored", "ignored")
+                        ),
+                        Data.DATACONTENTTYPE_TEXT,
+                        Data.DATA_TEXT_SERIALIZED,
+                        Data.V1_WITH_TEXT_DATA
+                )
+        );
+    }
+
+    private static final AbstractMap.SimpleEntry<String, String> property(final String name, final String value) {
+        return name.equalsIgnoreCase("ignored") ?
+                new AbstractMap.SimpleEntry<>(name, value) :
+                new AbstractMap.SimpleEntry<>(String.format(PREFIX_TEMPLATE, name), value);
+    }
+
+    @SafeVarargs
+    private static final Map<String, String> properties(final AbstractMap.SimpleEntry<String, String>... entries) {
+        return Stream.of(entries)
+                .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+
+    }
+}