You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/11/24 08:41:02 UTC
[rocketmq-connect] branch master updated: add mqtt source connector (#359)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new ff2295d7 add mqtt source connector (#359)
ff2295d7 is described below
commit ff2295d704fb05bbcf569d1a7d8adb8a273d01bc
Author: AetherZhuang <52...@users.noreply.github.com>
AuthorDate: Thu Nov 24 16:40:56 2022 +0800
add mqtt source connector (#359)
* add mqtt source connector
* add mqtt source connector use schema
* add mqtt source connector 3
Co-authored-by: zhuangxingwang <zh...@cmss.chinamobile.com>
---
connectors/rocketmq-connect-mqtt/pom.xml | 245 +++++++++++++++++++++
.../connect/mqtt/config/ConnectorConfig.java | 113 ++++++++++
.../connect/mqtt/config/SourceConnectorConfig.java | 93 ++++++++
.../mqtt/connector/MqttSourceConnector.java | 57 +++++
.../connect/mqtt/connector/MqttSourceTask.java | 130 +++++++++++
.../rocketmq/connect/mqtt/source/Replicator.java | 108 +++++++++
.../rocketmq/connect/mqtt/util/ConfigUtil.java | 70 ++++++
.../rocketmq/connect/mqtt/util/HmacSHA1Util.java | 44 ++++
.../connect/mqtt/util/MqttConnectionUtil.java | 20 ++
.../apache/rocketmq/connect/mqtt/util/Utils.java | 30 +++
.../java/connector/MqttSourceConnectorTest.java | 48 ++++
.../test/java/connector/MqttSourceTaskTest.java | 151 +++++++++++++
.../src/test/java/source/ReplicatorTest.java | 57 +++++
13 files changed, 1166 insertions(+)
diff --git a/connectors/rocketmq-connect-mqtt/pom.xml b/connectors/rocketmq-connect-mqtt/pom.xml
new file mode 100644
index 00000000..fe9eb4e3
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/pom.xml
@@ -0,0 +1,245 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-mqtt</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>rocketmq-connect-mqtt</name>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+
+ <scm>
+ <url>https://github.com/openmessaging/openmessaging-connector</url>
+ <connection>scm:git:git@github.com:openmessaging/openmessaging-connector.git</connection>
+ <developerConnection>scm:git:git@github.com:openmessaging/openmessaging-connector.git</developerConnection>
+ <tag>HEAD</tag>
+ </scm>
+
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+ <!-- Compiler settings properties -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <rocketmq.version>4.7.1</rocketmq.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.12</version>
+ <configuration>
+ <excludes>
+ <exclude>README.md</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>2.6.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.6.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.4</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ <version>0.3.1-alpha</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ <version>4.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.83</version>
+ </dependency>
+ <dependency>
+ <groupId>io.javalin</groupId>
+ <artifactId>javalin</artifactId>
+ <version>1.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.2.9</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.15</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>1.2.5</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/ConnectorConfig.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/ConnectorConfig.java
new file mode 100644
index 00000000..4e86676a
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/ConnectorConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ConnectorConfig {
+
+ @SuppressWarnings("serial")
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+ {
+ add(MQTT_BROKER_URL);
+ add(MQTT_ACCESS_KEY);
+ add(MQTT_SECRET_KEY);
+ }
+ };
+
+ public static final String MQTT_BROKER_URL = "mqttBrokerUrl";
+ public static final String MQTT_ACCESS_KEY = "mqttAccessKey";
+ public static final String MQTT_SECRET_KEY = "mqttSecretKey";
+
+ protected String mqttAccessKey;
+ protected String mqttSecretKey;
+ protected String mqttBrokerUrl;
+
+ public void load(KeyValue props) {
+
+ properties2Object(props, this);
+ }
+
+ private void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+ public String getMqttAccessKey() {
+ return mqttAccessKey;
+ }
+
+ public void setMqttAccessKey(String mqttAccessKey) {
+ this.mqttAccessKey = mqttAccessKey;
+ }
+
+ public String getMqttSecretKey() {
+ return mqttSecretKey;
+ }
+
+ public void setMqttSecretKey(String mqttSecretKey) {
+ this.mqttSecretKey = mqttSecretKey;
+ }
+
+ public String getMqttBrokerUrl() {
+ return mqttBrokerUrl;
+ }
+
+ public void setMqttBrokerUrl(String mqttBrokerUrl) {
+ this.mqttBrokerUrl = mqttBrokerUrl;
+ }
+}
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/SourceConnectorConfig.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/SourceConnectorConfig.java
new file mode 100644
index 00000000..3f5033f6
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/SourceConnectorConfig.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+public class SourceConnectorConfig extends ConnectorConfig {
+
+ @SuppressWarnings("serial")
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+ {
+ add(MQTT_SOURCE_TOPIC);
+ }
+ };
+
+ public static final String MESSAGE = "message";
+ public static final String POSITION = "position";
+ public static final String MQTT_SOURCE_TOPIC = "sourceTopic";
+
+ protected String sourceTopic;
+
+ public void load(KeyValue props) {
+
+ properties2Object(props, this);
+ }
+
+ private void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+ public String getSourceTopic() {
+ return sourceTopic;
+ }
+
+ public void setSourceTopic(String sourceTopic) {
+ this.sourceTopic = sourceTopic;
+ }
+}
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceConnector.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceConnector.java
new file mode 100644
index 00000000..c3dce487
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+
+public class MqttSourceConnector extends SourceConnector {
+
+ private KeyValue config;
+
+ @Override public void start(KeyValue config) {
+ for (String requestKey : SourceConnectorConfig.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ throw new ConnectException("Request config key: " + requestKey);
+ }
+ }
+ this.config = config;
+ }
+
+ @Override
+ public void stop() {
+ this.config = null;
+ }
+
+ @Override public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> config = new ArrayList<>();
+ config.add(this.config);
+ return config;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return MqttSourceTask.class;
+ }
+
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceTask.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceTask.java
new file mode 100644
index 00000000..33d31ef0
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceTask.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.connector;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.source.Replicator;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttSourceTask extends SourceTask {
+
+ private static final Logger log = LoggerFactory.getLogger(MqttSourceTask.class);
+
+ private Replicator replicator;
+
+ private SourceConnectorConfig sourceConnectConfig;
+
+ @Override
+ public List<ConnectRecord> poll() {
+ List<ConnectRecord> res = new ArrayList<>();
+ try {
+ MqttMessage message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+ if (message != null) {
+ res.add(message2ConnectRecord(message));
+ }
+ } catch (Exception e) {
+ log.error("mqtt task poll error, current config:" + JSON.toJSONString(sourceConnectConfig), e);
+ }
+ return res;
+ }
+
+ @Override
+ public void start(KeyValue props) {
+ try {
+ this.sourceConnectConfig = new SourceConnectorConfig();
+ this.sourceConnectConfig.load(props);
+ this.replicator = new Replicator(sourceConnectConfig);
+ this.replicator.start();
+ } catch (Exception e) {
+ log.error("mqtt task start failed.", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ replicator.stop();
+ } catch (Exception e) {
+ log.error("mqtt task stop failed.", e);
+ }
+ }
+
+ private ConnectRecord message2ConnectRecord(MqttMessage message) {
+ Schema schema = SchemaBuilder.struct().name("topicName").build();
+ final List<Field> fields = buildFields();
+ schema.setFields(fields);
+ final ConnectRecord connectRecord = new ConnectRecord(buildRecordPartition(),
+ buildRecordOffset(),
+ System.currentTimeMillis(),
+ schema,
+ buildPayLoad(fields, message, schema));
+ connectRecord.setExtensions(buildExtendFiled());
+ return connectRecord;
+ }
+
+ private RecordOffset buildRecordOffset() {
+ Map<String, Long> offsetMap = new HashMap<>();
+ offsetMap.put(SourceConnectorConfig.POSITION, 0l);
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ return recordOffset;
+ }
+
+ private RecordPartition buildRecordPartition() {
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put("partition", "defaultPartition");
+ RecordPartition recordPartition = new RecordPartition(partitionMap);
+ return recordPartition;
+ }
+
+ private List<Field> buildFields() {
+ final Schema stringSchema = SchemaBuilder.string().build();
+ List<Field> fields = new ArrayList<>();
+ fields.add(new Field(0, SourceConnectorConfig.MESSAGE, stringSchema));
+ return fields;
+ }
+
+ private Struct buildPayLoad(List<Field> fields, MqttMessage message, Schema schema) {
+ Struct payLoad = new Struct(schema);
+ payLoad.put(fields.get(0), new String(message.getPayload()));
+ return payLoad;
+
+ }
+
+ private KeyValue buildExtendFiled() {
+ KeyValue keyValue = new DefaultKeyValue();
+ return keyValue;
+ }
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/source/Replicator.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/source/Replicator.java
new file mode 100644
index 00000000..ff5aa289
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/source/Replicator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.source;
+
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.util.MqttConnectionUtil;
+import org.apache.rocketmq.connect.mqtt.util.Utils;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+ private final Logger log = LoggerFactory.getLogger(Replicator.class);
+
+ private MqttClient mqttClient;
+ private SourceConnectorConfig sourceConnectConfig;
+ private BlockingQueue<MqttMessage> queue = new LinkedBlockingQueue<>();
+
+ public Replicator(SourceConnectorConfig sourceConnectConfig) {
+ this.sourceConnectConfig = sourceConnectConfig;
+ }
+
+ public void start() throws Exception {
+ String brokerUrl = sourceConnectConfig.getMqttBrokerUrl();
+ MemoryPersistence memoryPersistence = new MemoryPersistence();
+ String sourceTopic = sourceConnectConfig.getSourceTopic();
+ String recvClientId = Utils.createClientId("recv");
+ MqttConnectOptions mqttConnectOptions = MqttConnectionUtil.buildMqttConnectOptions(recvClientId, sourceConnectConfig);
+ mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
+ mqttClient.setCallback(new MqttCallbackExtended() {
+ @Override
+ public void connectComplete(boolean reconnect, String serverURI) {
+ log.info("{} connect success to {}", recvClientId, serverURI);
+ try {
+ mqttClient.subscribe(sourceTopic, 1);
+ } catch (Exception e) {
+ log.error("{} subscribe failed", recvClientId);
+ }
+ }
+
+ @Override
+ public void connectionLost(Throwable throwable) {
+ log.error("connection lost {}", throwable.getMessage());
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage mqttMessage) {
+ try {
+ commit(mqttMessage, true);
+ } catch (Exception e) {
+ throw new ConnectException("commit MqttMessage failed", e);
+ }
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ }
+ });
+
+ try {
+ mqttClient.connect(mqttConnectOptions);
+ log.info("Replicator start succeed");
+ } catch (Exception e) {
+ log.error("connect fail {}", e.getMessage());
+ }
+ }
+
+ public void stop() throws Exception {
+ mqttClient.disconnect();
+ }
+
+ public void commit(MqttMessage message, boolean isComplete) {
+ queue.add(message);
+ }
+
+ public SourceConnectorConfig getConfig() {
+ return this.sourceConnectConfig;
+ }
+
+ public BlockingQueue<MqttMessage> getQueue() {
+ return queue;
+ }
+
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/ConfigUtil.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/ConfigUtil.java
new file mode 100644
index 00000000..b23611e1
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/ConfigUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.util;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+ public static <T> void load(KeyValue props, Object object) {
+
+ properties2Object(props, object);
+ }
+
+ private static <T> void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/HmacSHA1Util.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/HmacSHA1Util.java
new file mode 100644
index 00000000..c3c060b6
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/HmacSHA1Util.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.util;
+
+import java.nio.charset.Charset;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import org.apache.commons.codec.binary.Base64;
+
+public class HmacSHA1Util {
+ private static Charset charset = Charset.forName("UTF-8");
+ private static String algorithm = "HmacSHA1";
+
+ public static String macSignature(String text,
+ String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
+ Mac mac = Mac.getInstance(algorithm);
+ mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
+ byte[] bytes = mac.doFinal(text.getBytes(charset));
+ return new String(Base64.encodeBase64(bytes), charset);
+ }
+
+ public static boolean validateSign(String text, byte[] input,
+ String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
+ String sign = macSignature(text, secretKey);
+ return sign.equals(new String(input, charset));
+ }
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/MqttConnectionUtil.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/MqttConnectionUtil.java
new file mode 100644
index 00000000..2dea7314
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/MqttConnectionUtil.java
@@ -0,0 +1,20 @@
+package org.apache.rocketmq.connect.mqtt.util;
+
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import org.apache.rocketmq.connect.mqtt.config.ConnectorConfig;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+
+public class MqttConnectionUtil {
+ public static MqttConnectOptions buildMqttConnectOptions(
+ String clientId, ConnectorConfig connectorConfig) throws NoSuchAlgorithmException, InvalidKeyException {
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ connOpts.setKeepAliveInterval(60);
+ connOpts.setAutomaticReconnect(true);
+ connOpts.setMaxInflight(10000);
+ connOpts.setUserName(connectorConfig.getMqttAccessKey());
+ connOpts.setPassword(HmacSHA1Util.macSignature(clientId, connectorConfig.getMqttSecretKey()).toCharArray());
+ return connOpts;
+ }
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/Utils.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/Utils.java
new file mode 100644
index 00000000..e3665618
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/Utils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utils {
+ private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+ public static String createClientId(String prefix) {
+ return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+ }
+
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceConnectorTest.java b/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceConnectorTest.java
new file mode 100644
index 00000000..7514657d
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceConnectorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.connector.MqttSourceConnector;
+import org.apache.rocketmq.connect.mqtt.connector.MqttSourceTask;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MqttSourceConnectorTest {
+
+ MqttSourceConnector connector = new MqttSourceConnector();
+
+ @Test
+ public void taskClassTest() {
+ assertEquals(connector.taskClass(), MqttSourceTask.class);
+ }
+
+ @Test
+ public void taskConfigsTest() {
+ assertEquals(connector.taskConfigs(2).get(0), null);
+ KeyValue keyValue = new DefaultKeyValue();
+ for (String requestKey : SourceConnectorConfig.REQUEST_CONFIG) {
+ keyValue.put(requestKey, requestKey);
+ }
+ connector.start(keyValue);
+ assertEquals(connector.taskConfigs(2).get(0), keyValue);
+ }
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceTaskTest.java b/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceTaskTest.java
new file mode 100644
index 00000000..50d11c46
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceTaskTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.connector.MqttSourceTask;
+import org.apache.rocketmq.connect.mqtt.source.Replicator;
+import org.apache.rocketmq.connect.mqtt.util.HmacSHA1Util;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class MqttSourceTaskTest {
+ private MqttClient mqttClient = null;
+
+ @Before
+ public void before() throws MqttException, NoSuchAlgorithmException, InvalidKeyException {
+ MemoryPersistence memoryPersistence = new MemoryPersistence();
+ String brokerUrl = "tcp://100.76.11.96:1883";
+ String sendClientId = "send01";
+ MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(sendClientId);
+ mqttClient = new MqttClient(brokerUrl, sendClientId, memoryPersistence);
+ mqttClient.setTimeToWait(5000L);
+ mqttClient.setCallback(new MqttCallbackExtended() {
+ @Override
+ public void connectComplete(boolean reconnect, String serverURI) {
+ System.out.println(sendClientId + " connect success to " + serverURI);
+ }
+
+ @Override
+ public void connectionLost(Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage mqttMessage) {
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ }
+ });
+ try {
+ mqttClient.connect(mqttConnectOptions);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test() throws MqttException {
+ KeyValue kv = new DefaultKeyValue();
+ kv.put("mqttBrokerUrl", "tcp://100.76.11.96:1883");
+ kv.put("mqttAccessKey", "rocketmq");
+ kv.put("mqttSecretKey", "12345678");
+ kv.put("sourceTopic", "topic_producer");
+ MqttSourceTask task = new MqttSourceTask();
+ task.start(kv);
+ for (int i = 0; i < 5; ) {
+ Collection<ConnectRecord> sourceDataEntry = task.poll();
+ String msg = "r1_" + System.currentTimeMillis() + "_" + i;
+ MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
+ message.setQos(1);
+ String mqttSendTopic = "topic_producer";
+ mqttClient.publish(mqttSendTopic, message);
+ System.out.println("send: " + mqttSendTopic + ", " + msg);
+ i = i + sourceDataEntry.size();
+ System.out.println(sourceDataEntry);
+ }
+ }
+
+ @Test
+ public void pollTest() throws Exception {
+ MqttSourceTask task = new MqttSourceTask();
+ MqttMessage textMessage = new MqttMessage();
+ textMessage.setPayload("hello rocketmq".getBytes(StandardCharsets.UTF_8));
+
+ Replicator replicatorObject = Mockito.mock(Replicator.class);
+ BlockingQueue<MqttMessage> queue = new LinkedBlockingQueue<>();
+ Mockito.when(replicatorObject.getQueue()).thenReturn(queue);
+
+ Field replicator = MqttSourceTask.class.getDeclaredField("replicator");
+ replicator.setAccessible(true);
+ replicator.set(task, replicatorObject);
+
+ Field config = MqttSourceTask.class.getDeclaredField("sourceConnectConfig");
+ config.setAccessible(true);
+ config.set(task, new SourceConnectorConfig());
+
+ queue.put(textMessage);
+ Collection<ConnectRecord> list = task.poll();
+ Assert.assertEquals(list.size(), 1);
+
+ list = task.poll();
+ Assert.assertEquals(list.size(), 0);
+
+ }
+
+ @After
+ public void after() throws MqttException {
+ mqttClient.disconnect();
+ }
+
+ private static MqttConnectOptions buildMqttConnectOptions(
+ String clientId) throws NoSuchAlgorithmException, InvalidKeyException {
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ connOpts.setKeepAliveInterval(60);
+ connOpts.setAutomaticReconnect(true);
+ connOpts.setMaxInflight(10000);
+ connOpts.setUserName("rocketmq");
+ connOpts.setPassword(HmacSHA1Util.macSignature(clientId, "12345678").toCharArray());
+ return connOpts;
+ }
+
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/test/java/source/ReplicatorTest.java b/connectors/rocketmq-connect-mqtt/src/test/java/source/ReplicatorTest.java
new file mode 100644
index 00000000..6f8661df
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/test/java/source/ReplicatorTest.java
@@ -0,0 +1,57 @@
+package source;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.source.Replicator;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicatorTest {
+
+ Replicator replicator;
+ SourceConnectorConfig config;
+
+ @Before
+ public void before() throws Exception {
+ config = new SourceConnectorConfig();
+ config.setMqttBrokerUrl("tcp://100.76.11.96:1883");
+ config.setMqttAccessKey("rocketmq");
+ config.setMqttSecretKey("12345678");
+ config.setSourceTopic("topic_producer");
+ replicator = new Replicator(config);
+ replicator.start();
+ }
+
+ @Test
+ public void stop() throws Exception {
+ replicator.stop();
+ }
+
+ @Test
+ public void commitAddGetQueueTest() {
+ MqttMessage message = new MqttMessage();
+ replicator.commit(message, false);
+ Assert.assertEquals(replicator.getQueue().poll(), message);
+ }
+
+ @Test
+ public void getConfigTest() {
+ Assert.assertEquals(replicator.getConfig(), config);
+ }
+}