You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/11/08 16:57:54 UTC
[rocketmq-cloudevents] 02/03: [ISSUE #810]CloudEvents binding polish
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-cloudevents.git
commit 0072f59ff80e9caa6f83db2cbf444f8a1615221e
Author: wangshaojie4039 <15...@163.com>
AuthorDate: Wed Sep 8 12:44:42 2021 +0800
[ISSUE #810]CloudEvents binding polish
Co-authored-by: wangshaojie <wa...@cmss.chinamobile.com>
---
pom.xml | 177 ++++++++++++++
.../cloudevent/RocketMQMessageFactory.java | 71 ++++++
.../impl/RocketMQBinaryMessageReader.java | 63 +++++
.../rocketmq/cloudevent/impl/RocketMQHeaders.java | 35 +++
.../cloudevent/impl/RocketMQMessageWriter.java | 99 ++++++++
.../cloudevent/RocketMQMessageWriterTest.java | 270 +++++++++++++++++++++
.../cloudevent/RocketmqMessageFactoryTest.java | 237 ++++++++++++++++++
7 files changed, 952 insertions(+)
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..5236365
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-cloudevents-binding</artifactId>
+ <version>1.0.0</version>
+
+ <name>rocketmq-cloudevents-binding</name>
+ <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-cloudevents-binding</url>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <issueManagement>
+ <system>jira</system>
+ <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+ </issueManagement>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+ <!-- Compiler settings properties -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>4.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-core</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>3.0.2</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>5.7.0</version>
+ </dependency>
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-core</artifactId>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <version>2.2.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>3.16.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/RocketMQMessageFactory.java b/src/main/java/org/apache/rocketmq/cloudevent/RocketMQMessageFactory.java
new file mode 100644
index 0000000..9d728c1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/RocketMQMessageFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent;
+
+import org.apache.rocketmq.cloudevent.impl.RocketMQBinaryMessageReader;
+import org.apache.rocketmq.cloudevent.impl.RocketMQHeaders;
+import org.apache.rocketmq.cloudevent.impl.RocketMQMessageWriter;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
+import io.cloudevents.core.message.impl.MessageUtils;
+import io.cloudevents.lang.Nullable;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import org.apache.rocketmq.common.message.Message;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import java.util.Map;
+
+
+@ParametersAreNonnullByDefault
+public final class RocketMQMessageFactory {
+
+ private RocketMQMessageFactory() {
+ // prevent instantiation
+ }
+
+ public static MessageReader createReader(final Message message) throws CloudEventRWException {
+ return createReader(message.getProperties(), message.getBody());
+ }
+
+
+ public static MessageReader createReader(final Map<String, String> props, @Nullable final byte[] body) throws CloudEventRWException {
+
+ return MessageUtils.parseStructuredOrBinaryMessage(
+ () -> props.get(RocketMQHeaders.CONTENT_TYPE),
+ format -> new GenericStructuredMessageReader(format, body),
+ () -> props.get(RocketMQHeaders.SPEC_VERSION),
+ sv -> new RocketMQBinaryMessageReader(sv, props, body)
+ );
+ }
+
+
+ public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic) {
+ return new RocketMQMessageWriter<>(topic);
+ }
+
+ public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic, String keys) {
+ return new RocketMQMessageWriter<>(topic, keys);
+ }
+
+ public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic, String keys, String tags) {
+ return new RocketMQMessageWriter<>(topic, keys, tags);
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
new file mode 100644
index 0000000..f48f9ee
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent.impl;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.data.BytesCloudEventData;
+import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+
+public class RocketMQBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, String> {
+
+ private final Map<String, String> headers;
+
+ public RocketMQBinaryMessageReader(SpecVersion version, Map<String, String> headers, byte[] payload) {
+ super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);
+
+ Objects.requireNonNull(headers);
+ this.headers = headers;
+ }
+
+ @Override
+ protected boolean isContentTypeHeader(String key) {
+ return key.equals(RocketMQHeaders.CONTENT_TYPE);
+ }
+
+ @Override
+ protected boolean isCloudEventsHeader(String key) {
+ return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length()).startsWith(RocketMQHeaders.CE_PREFIX);
+ }
+
+ @Override
+ protected String toCloudEventsKey(String key) {
+ return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase();
+ }
+
+ @Override
+ protected void forEachHeader(BiConsumer<String, String> fn) {
+ this.headers.forEach((k, v) -> fn.accept(k, v));
+ }
+
+ @Override
+ protected String toCloudEventsValue(String value) {
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQHeaders.java b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQHeaders.java
new file mode 100644
index 0000000..f292777
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQHeaders.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.cloudevent.impl;
+
+import io.cloudevents.core.message.impl.MessageUtils;
+import io.cloudevents.core.v1.CloudEventV1;
+
+import java.util.Map;
+
+public class RocketMQHeaders {
+
+ public static final String CE_PREFIX = "CE_";
+
+ protected static final Map<String, String> ATTRIBUTES_TO_HEADERS = MessageUtils.generateAttributesToHeadersMapping(v -> CE_PREFIX + v);
+
+ public static final String CONTENT_TYPE = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE);
+
+ public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION);
+
+}
+
diff --git a/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQMessageWriter.java b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
new file mode 100644
index 0000000..6bbfc71
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.cloudevent.impl;
+
+import io.cloudevents.CloudEventData;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.rw.CloudEventContextWriter;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import org.apache.rocketmq.common.message.Message;
+
+
+public final class RocketMQMessageWriter<R> implements MessageWriter<CloudEventWriter<Message>, Message>, CloudEventWriter<Message> {
+
+ private Message message;
+
+
+ public RocketMQMessageWriter(String topic) {
+ message = new Message();
+ message.setTopic(topic);
+ }
+
+ public RocketMQMessageWriter(String topic, String keys) {
+ message = new Message();
+
+ message.setTopic(topic);
+
+ if (keys != null && keys.length() > 0) {
+ message.setKeys(keys);
+ }
+ }
+
+ public RocketMQMessageWriter(String topic, String keys, String tags) {
+ message = new Message();
+
+ message.setTopic(topic);
+
+ if (tags != null && tags.length() > 0) {
+ message.setTags(tags);
+ }
+
+ if (keys != null && keys.length() > 0) {
+ message.setKeys(keys);
+ }
+ }
+
+
+ @Override
+ public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
+
+ String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name);
+ if (propName == null) {
+ propName = RocketMQHeaders.CE_PREFIX + name;
+ }
+ message.putUserProperty(propName, value);
+ return this;
+ }
+
+ @Override
+ public RocketMQMessageWriter<R> create(final SpecVersion version) {
+ message.putUserProperty(RocketMQHeaders.SPEC_VERSION, version.toString());
+ return this;
+ }
+
+ @Override
+ public Message setEvent(final EventFormat format, final byte[] value) throws CloudEventRWException {
+ message.putUserProperty(RocketMQHeaders.CONTENT_TYPE, format.serializedContentType());
+ message.setBody(value);
+ return message;
+ }
+
+ @Override
+ public Message end(final CloudEventData data) throws CloudEventRWException {
+ message.setBody(data.toBytes());
+ return message;
+ }
+
+ @Override
+ public Message end() {
+ message.setBody(null);
+ return message;
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/cloudevent/RocketMQMessageWriterTest.java b/src/test/java/org/apache/rocketmq/cloudevent/RocketMQMessageWriterTest.java
new file mode 100644
index 0000000..cbab95d
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/cloudevent/RocketMQMessageWriterTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.message.StructuredMessageReader;
+import io.cloudevents.core.mock.CSVFormat;
+import io.cloudevents.core.test.Data;
+import io.cloudevents.core.v03.CloudEventV03;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.types.Time;
+import org.apache.rocketmq.cloudevent.impl.RocketMQHeaders;
+import org.apache.rocketmq.common.message.Message;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RocketMQMessageWriterTest {
+
+ private static final String PREFIX_TEMPLATE = RocketMQHeaders.CE_PREFIX + "%s";
+ private static final String DATACONTENTTYPE_NULL = null;
+ private static final byte[] DATAPAYLOAD_NULL = null;
+
+
+ @ParameterizedTest
+ @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
+ void testRequestWithStructured(CloudEvent event) {
+ String expectedContentType = CSVFormat.INSTANCE.serializedContentType();
+ byte[] expectedBuffer = CSVFormat.INSTANCE.serialize(event);
+
+ String topic = "test";
+ String keys = "keys";
+ String tags = "tags";
+
+ Message message = StructuredMessageReader
+ .from(event, CSVFormat.INSTANCE)
+ .read(RocketMQMessageFactory.createWriter(topic, keys, tags));
+
+ assertThat(message.getTopic())
+ .isEqualTo(topic);
+ assertThat(message.getKeys())
+ .isEqualTo(keys);
+ assertThat(message.getTags())
+ .isEqualTo(tags);
+ assertThat(message.getBody())
+ .isEqualTo(expectedBuffer);
+ }
+
+ @ParameterizedTest
+ @MethodSource("binaryTestArguments")
+ void testRequestWithBinary(CloudEvent event, Map<String, String> expectedHeaders, byte[] expectedBody) {
+
+ String topic = "test";
+ String keys = "keys";
+ String tags = "tags";
+
+ Message message = RocketMQMessageFactory
+ .createWriter(topic, keys, tags)
+ .writeBinary(event);
+
+ assertThat(message.getTopic())
+ .isEqualTo(topic);
+ assertThat(message.getKeys())
+ .isEqualTo(keys);
+ assertThat(message.getTags())
+ .isEqualTo(tags);
+ assertThat(message.getBody())
+ .isEqualTo(expectedBody);
+ assertThat(message.getProperties()
+ .keySet().containsAll(expectedHeaders.keySet()));
+ assertThat(message.getProperties()
+ .values().containsAll(expectedHeaders.values()));
+ }
+
+ private static Stream<Arguments> binaryTestArguments() {
+
+ return Stream.of(
+ // V03
+ Arguments.of(
+ Data.V03_MIN,
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property("ignored", "ignore")
+ ),
+ DATAPAYLOAD_NULL
+ ),
+ Arguments.of(
+ Data.V03_WITH_JSON_DATA,
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignore")
+ ),
+ Data.DATA_JSON_SERIALIZED
+
+ ),
+ Arguments.of(
+ Data.V03_WITH_JSON_DATA_WITH_EXT_STRING,
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("astring", "aaa"),
+ property("aboolean", "true"),
+ property("anumber", "10"),
+ property("ignored", "ignored")
+ ),
+ Data.DATA_JSON_SERIALIZED
+
+ ),
+ Arguments.of(
+ Data.V03_WITH_XML_DATA,
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+
+ Data.DATA_XML_SERIALIZED
+
+ ),
+ Arguments.of(
+ Data.V03_WITH_TEXT_DATA,
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+
+ Data.DATA_TEXT_SERIALIZED
+
+ ),
+ // V1
+ Arguments.of(
+ Data.V1_MIN,
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property("ignored", "ignored")
+ ),
+
+ DATAPAYLOAD_NULL
+
+ ),
+ Arguments.of(
+ Data.V1_WITH_JSON_DATA,
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+
+ Data.DATA_JSON_SERIALIZED
+
+ ),
+ Arguments.of(
+ Data.V1_WITH_JSON_DATA_WITH_EXT_STRING,
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("astring", "aaa"),
+ property("aboolean", "true"),
+ property("anumber", "10"),
+ property("ignored", "ignored")
+ ),
+
+ Data.DATA_JSON_SERIALIZED
+
+ ),
+ Arguments.of(
+ Data.V1_WITH_XML_DATA,
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+
+ Data.DATA_XML_SERIALIZED
+
+ ),
+ Arguments.of(
+ Data.V1_WITH_TEXT_DATA,
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+
+ Data.DATA_TEXT_SERIALIZED
+
+ )
+ );
+ }
+
+ private static final AbstractMap.SimpleEntry<String, String> property(final String name, final String value) {
+ return name.equalsIgnoreCase("ignored") ?
+ new AbstractMap.SimpleEntry<>(name, value) :
+ new AbstractMap.SimpleEntry<>(String.format(PREFIX_TEMPLATE, name), value);
+ }
+
+ @SafeVarargs
+ private static final Map<String, String> properties(final AbstractMap.SimpleEntry<String, String>... entries) {
+ return Stream.of(entries)
+ .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/cloudevent/RocketmqMessageFactoryTest.java b/src/test/java/org/apache/rocketmq/cloudevent/RocketmqMessageFactoryTest.java
new file mode 100644
index 0000000..25fa69a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/cloudevent/RocketmqMessageFactoryTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.cloudevent;
+
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.message.Encoding;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.mock.CSVFormat;
+import io.cloudevents.core.test.Data;
+import io.cloudevents.core.v03.CloudEventV03;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.types.Time;
+import org.apache.rocketmq.cloudevent.impl.RocketMQHeaders;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RocketmqMessageFactoryTest {
+
+ private static final String PREFIX_TEMPLATE = RocketMQHeaders.CE_PREFIX + "%s";
+ private static final String DATACONTENTTYPE_NULL = null;
+ private static final byte[] DATAPAYLOAD_NULL = null;
+
+ @ParameterizedTest()
+ @MethodSource("binaryTestArguments")
+ public void readBinary(final Map<String, String> props, final String contentType, final byte[] body,
+ final CloudEvent event) {
+ props.put(RocketMQHeaders.CONTENT_TYPE, contentType);
+ final MessageReader reader = RocketMQMessageFactory.createReader(props, body);
+ assertThat(reader.getEncoding()).isEqualTo(Encoding.BINARY);
+ assertThat(reader.toEvent()).isEqualTo(event);
+ }
+
+ @ParameterizedTest()
+ @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
+ public void readStructured(final CloudEvent event) {
+ final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8";
+ final byte[] contentPayload = CSVFormat.INSTANCE.serialize(event);
+ Map<String, String> properties = new HashMap<>();
+ properties.put(RocketMQHeaders.CONTENT_TYPE, contentType);
+ final MessageReader reader = RocketMQMessageFactory.createReader(properties, contentPayload);
+ assertThat(reader.getEncoding()).isEqualTo(Encoding.STRUCTURED);
+ assertThat(reader.toEvent()).isEqualTo(event);
+ }
+
+ private static Stream<Arguments> binaryTestArguments() {
+
+ return Stream.of(
+ // V03
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property("ignored", "ignore")
+ ),
+ DATACONTENTTYPE_NULL,
+ DATAPAYLOAD_NULL,
+ Data.V03_MIN
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignore")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V03_WITH_JSON_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("astring", "aaa"),
+ property("aboolean", "true"),
+ property("anumber", "10"),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V03_WITH_JSON_DATA_WITH_EXT_STRING
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_XML,
+ Data.DATA_XML_SERIALIZED,
+ Data.V03_WITH_XML_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
+ property(CloudEventV03.ID, Data.ID),
+ property(CloudEventV03.TYPE, Data.TYPE),
+ property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV03.SUBJECT, Data.SUBJECT),
+ property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_TEXT,
+ Data.DATA_TEXT_SERIALIZED,
+ Data.V03_WITH_TEXT_DATA
+ ),
+ // V1
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property("ignored", "ignored")
+ ),
+ DATACONTENTTYPE_NULL,
+ DATAPAYLOAD_NULL,
+ Data.V1_MIN
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V1_WITH_JSON_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("astring", "aaa"),
+ property("aboolean", "true"),
+ property("anumber", "10"),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_JSON,
+ Data.DATA_JSON_SERIALIZED,
+ Data.V1_WITH_JSON_DATA_WITH_EXT_STRING
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_XML,
+ Data.DATA_XML_SERIALIZED,
+ Data.V1_WITH_XML_DATA
+ ),
+ Arguments.of(
+ properties(
+ property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
+ property(CloudEventV1.ID, Data.ID),
+ property(CloudEventV1.TYPE, Data.TYPE),
+ property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
+ property(CloudEventV1.SUBJECT, Data.SUBJECT),
+ property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
+ property("ignored", "ignored")
+ ),
+ Data.DATACONTENTTYPE_TEXT,
+ Data.DATA_TEXT_SERIALIZED,
+ Data.V1_WITH_TEXT_DATA
+ )
+ );
+ }
+
+ private static final AbstractMap.SimpleEntry<String, String> property(final String name, final String value) {
+ return name.equalsIgnoreCase("ignored") ?
+ new AbstractMap.SimpleEntry<>(name, value) :
+ new AbstractMap.SimpleEntry<>(String.format(PREFIX_TEMPLATE, name), value);
+ }
+
+ @SafeVarargs
+ private static final Map<String, String> properties(final AbstractMap.SimpleEntry<String, String>... entries) {
+ return Stream.of(entries)
+ .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+
+ }
+}