You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/11/24 08:41:02 UTC

[rocketmq-connect] branch master updated: add mqtt source connector (#359)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new ff2295d7 add mqtt source connector (#359)
ff2295d7 is described below

commit ff2295d704fb05bbcf569d1a7d8adb8a273d01bc
Author: AetherZhuang <52...@users.noreply.github.com>
AuthorDate: Thu Nov 24 16:40:56 2022 +0800

    add mqtt source connector (#359)
    
    * add mqtt source connector
    
    * add mqtt source connector use schema
    
    * add mqtt source connector 3
    
    Co-authored-by: zhuangxingwang <zh...@cmss.chinamobile.com>
---
 connectors/rocketmq-connect-mqtt/pom.xml           | 245 +++++++++++++++++++++
 .../connect/mqtt/config/ConnectorConfig.java       | 113 ++++++++++
 .../connect/mqtt/config/SourceConnectorConfig.java |  93 ++++++++
 .../mqtt/connector/MqttSourceConnector.java        |  57 +++++
 .../connect/mqtt/connector/MqttSourceTask.java     | 130 +++++++++++
 .../rocketmq/connect/mqtt/source/Replicator.java   | 108 +++++++++
 .../rocketmq/connect/mqtt/util/ConfigUtil.java     |  70 ++++++
 .../rocketmq/connect/mqtt/util/HmacSHA1Util.java   |  44 ++++
 .../connect/mqtt/util/MqttConnectionUtil.java      |  20 ++
 .../apache/rocketmq/connect/mqtt/util/Utils.java   |  30 +++
 .../java/connector/MqttSourceConnectorTest.java    |  48 ++++
 .../test/java/connector/MqttSourceTaskTest.java    | 151 +++++++++++++
 .../src/test/java/source/ReplicatorTest.java       |  57 +++++
 13 files changed, 1166 insertions(+)

diff --git a/connectors/rocketmq-connect-mqtt/pom.xml b/connectors/rocketmq-connect-mqtt/pom.xml
new file mode 100644
index 00000000..fe9eb4e3
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/pom.xml
@@ -0,0 +1,245 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-mqtt</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>rocketmq-connect-mqtt</name>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <scm>
+        <url>https://github.com/openmessaging/openmessaging-connector</url>
+        <connection>scm:git:git@github.com:openmessaging/openmessaging-connector.git</connection>
+        <developerConnection>scm:git:git@github.com:openmessaging/openmessaging-connector.git</developerConnection>
+        <tag>HEAD</tag>
+    </scm>
+
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <rocketmq.version>4.7.1</rocketmq.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.13.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.4</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+            <version>0.3.1-alpha</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.83</version>
+        </dependency>
+        <dependency>
+            <groupId>io.javalin</groupId>
+            <artifactId>javalin</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.2.9</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.15</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.5</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/ConnectorConfig.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/ConnectorConfig.java
new file mode 100644
index 00000000..4e86676a
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/ConnectorConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ConnectorConfig {
+
+    @SuppressWarnings("serial")
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add(MQTT_BROKER_URL);
+            add(MQTT_ACCESS_KEY);
+            add(MQTT_SECRET_KEY);
+        }
+    };
+
+    public static final String MQTT_BROKER_URL = "mqttBrokerUrl";
+    public static final String MQTT_ACCESS_KEY = "mqttAccessKey";
+    public static final String MQTT_SECRET_KEY = "mqttSecretKey";
+
+    protected String mqttAccessKey;
+    protected String mqttSecretKey;
+    protected String mqttBrokerUrl;
+
+    public void load(KeyValue props) {
+
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public String getMqttAccessKey() {
+        return mqttAccessKey;
+    }
+
+    public void setMqttAccessKey(String mqttAccessKey) {
+        this.mqttAccessKey = mqttAccessKey;
+    }
+
+    public String getMqttSecretKey() {
+        return mqttSecretKey;
+    }
+
+    public void setMqttSecretKey(String mqttSecretKey) {
+        this.mqttSecretKey = mqttSecretKey;
+    }
+
+    public String getMqttBrokerUrl() {
+        return mqttBrokerUrl;
+    }
+
+    public void setMqttBrokerUrl(String mqttBrokerUrl) {
+        this.mqttBrokerUrl = mqttBrokerUrl;
+    }
+}
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/SourceConnectorConfig.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/SourceConnectorConfig.java
new file mode 100644
index 00000000..3f5033f6
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/config/SourceConnectorConfig.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+public class SourceConnectorConfig extends ConnectorConfig {
+
+    @SuppressWarnings("serial")
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add(MQTT_SOURCE_TOPIC);
+        }
+    };
+
+    public static final String MESSAGE = "message";
+    public static final String POSITION = "position";
+    public static final String MQTT_SOURCE_TOPIC = "sourceTopic";
+
+    protected String sourceTopic;
+
+    public void load(KeyValue props) {
+
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public String getSourceTopic() {
+        return sourceTopic;
+    }
+
+    public void setSourceTopic(String sourceTopic) {
+        this.sourceTopic = sourceTopic;
+    }
+}
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceConnector.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceConnector.java
new file mode 100644
index 00000000..c3dce487
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+
+public class MqttSourceConnector extends SourceConnector {
+
+    private KeyValue config;
+
+    @Override public void start(KeyValue config) {
+        for (String requestKey : SourceConnectorConfig.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                throw new ConnectException("Request config key: " + requestKey);
+            }
+        }
+        this.config = config;
+    }
+
+    @Override
+    public void stop() {
+        this.config = null;
+    }
+
+    @Override public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> config = new ArrayList<>();
+        config.add(this.config);
+        return config;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MqttSourceTask.class;
+    }
+
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceTask.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceTask.java
new file mode 100644
index 00000000..33d31ef0
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceTask.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.connector;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.source.Replicator;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(MqttSourceTask.class);
+
+    private Replicator replicator;
+
+    private SourceConnectorConfig sourceConnectConfig;
+
+    @Override
+    public List<ConnectRecord> poll() {
+        List<ConnectRecord> res = new ArrayList<>();
+        try {
+            MqttMessage message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+            if (message != null) {
+                res.add(message2ConnectRecord(message));
+            }
+        } catch (Exception e) {
+            log.error("mqtt task poll error, current config:" + JSON.toJSONString(sourceConnectConfig), e);
+        }
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue props) {
+        try {
+            this.sourceConnectConfig = new SourceConnectorConfig();
+            this.sourceConnectConfig.load(props);
+            this.replicator = new Replicator(sourceConnectConfig);
+            this.replicator.start();
+        } catch (Exception e) {
+            log.error("mqtt task start failed.", e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            replicator.stop();
+        } catch (Exception e) {
+            log.error("mqtt task stop failed.", e);
+        }
+    }
+
+    private ConnectRecord message2ConnectRecord(MqttMessage message) {
+        Schema schema = SchemaBuilder.struct().name("topicName").build();
+        final List<Field> fields = buildFields();
+        schema.setFields(fields);
+        final ConnectRecord connectRecord = new ConnectRecord(buildRecordPartition(),
+            buildRecordOffset(),
+            System.currentTimeMillis(),
+            schema,
+            buildPayLoad(fields, message, schema));
+        connectRecord.setExtensions(buildExtendFiled());
+        return connectRecord;
+    }
+
+    private RecordOffset buildRecordOffset() {
+        Map<String, Long> offsetMap = new HashMap<>();
+        offsetMap.put(SourceConnectorConfig.POSITION, 0l);
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        return recordOffset;
+    }
+
+    private RecordPartition buildRecordPartition() {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put("partition", "defaultPartition");
+        RecordPartition recordPartition = new RecordPartition(partitionMap);
+        return recordPartition;
+    }
+
+    private List<Field> buildFields() {
+        final Schema stringSchema = SchemaBuilder.string().build();
+        List<Field> fields = new ArrayList<>();
+        fields.add(new Field(0, SourceConnectorConfig.MESSAGE, stringSchema));
+        return fields;
+    }
+
+    private Struct buildPayLoad(List<Field> fields, MqttMessage message, Schema schema) {
+        Struct payLoad = new Struct(schema);
+        payLoad.put(fields.get(0), new String(message.getPayload()));
+        return payLoad;
+
+    }
+
+    private KeyValue buildExtendFiled() {
+        KeyValue keyValue = new DefaultKeyValue();
+        return keyValue;
+    }
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/source/Replicator.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/source/Replicator.java
new file mode 100644
index 00000000..ff5aa289
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/source/Replicator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.source;
+
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.util.MqttConnectionUtil;
+import org.apache.rocketmq.connect.mqtt.util.Utils;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private final Logger log = LoggerFactory.getLogger(Replicator.class);
+
+    private MqttClient mqttClient;
+    private SourceConnectorConfig sourceConnectConfig;
+    private BlockingQueue<MqttMessage> queue = new LinkedBlockingQueue<>();
+
+    public Replicator(SourceConnectorConfig sourceConnectConfig) {
+        this.sourceConnectConfig = sourceConnectConfig;
+    }
+
+    public void start() throws Exception {
+        String brokerUrl = sourceConnectConfig.getMqttBrokerUrl();
+        MemoryPersistence memoryPersistence = new MemoryPersistence();
+        String sourceTopic = sourceConnectConfig.getSourceTopic();
+        String recvClientId = Utils.createClientId("recv");
+        MqttConnectOptions mqttConnectOptions = MqttConnectionUtil.buildMqttConnectOptions(recvClientId, sourceConnectConfig);
+        mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
+        mqttClient.setCallback(new MqttCallbackExtended() {
+            @Override
+            public void connectComplete(boolean reconnect, String serverURI) {
+                log.info("{} connect success to {}", recvClientId, serverURI);
+                try {
+                    mqttClient.subscribe(sourceTopic, 1);
+                } catch (Exception e) {
+                    log.error("{} subscribe failed", recvClientId);
+                }
+            }
+
+            @Override
+            public void connectionLost(Throwable throwable) {
+                log.error("connection lost {}", throwable.getMessage());
+            }
+
+            @Override
+            public void messageArrived(String topic, MqttMessage mqttMessage) {
+                try {
+                    commit(mqttMessage, true);
+                } catch (Exception e) {
+                    throw new ConnectException("commit MqttMessage failed", e);
+                }
+            }
+
+            @Override
+            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+            }
+        });
+
+        try {
+            mqttClient.connect(mqttConnectOptions);
+            log.info("Replicator start succeed");
+        } catch (Exception e) {
+            log.error("connect fail {}", e.getMessage());
+        }
+    }
+
+    public void stop() throws Exception {
+        mqttClient.disconnect();
+    }
+
+    public void commit(MqttMessage message, boolean isComplete) {
+        queue.add(message);
+    }
+
+    public SourceConnectorConfig getConfig() {
+        return this.sourceConnectConfig;
+    }
+
+    public BlockingQueue<MqttMessage> getQueue() {
+        return queue;
+    }
+
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/ConfigUtil.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/ConfigUtil.java
new file mode 100644
index 00000000..b23611e1
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/ConfigUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.util;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+    public static <T> void load(KeyValue props, Object object) {
+
+        properties2Object(props, object);
+    }
+
+    private static <T> void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/HmacSHA1Util.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/HmacSHA1Util.java
new file mode 100644
index 00000000..c3c060b6
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/HmacSHA1Util.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.util;
+
+import java.nio.charset.Charset;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import org.apache.commons.codec.binary.Base64;
+
+public class HmacSHA1Util {
+    private static Charset charset = Charset.forName("UTF-8");
+    private static String algorithm = "HmacSHA1";
+
+    public static String macSignature(String text,
+        String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
+        Mac mac = Mac.getInstance(algorithm);
+        mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
+        byte[] bytes = mac.doFinal(text.getBytes(charset));
+        return new String(Base64.encodeBase64(bytes), charset);
+    }
+
+    public static boolean validateSign(String text, byte[] input,
+        String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
+        String sign = macSignature(text, secretKey);
+        return sign.equals(new String(input, charset));
+    }
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/MqttConnectionUtil.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/MqttConnectionUtil.java
new file mode 100644
index 00000000..2dea7314
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/MqttConnectionUtil.java
@@ -0,0 +1,20 @@
+package org.apache.rocketmq.connect.mqtt.util;
+
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import org.apache.rocketmq.connect.mqtt.config.ConnectorConfig;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+
+public class MqttConnectionUtil {
+    public static MqttConnectOptions buildMqttConnectOptions(
+        String clientId, ConnectorConfig connectorConfig) throws NoSuchAlgorithmException, InvalidKeyException {
+        MqttConnectOptions connOpts = new MqttConnectOptions();
+        connOpts.setCleanSession(true);
+        connOpts.setKeepAliveInterval(60);
+        connOpts.setAutomaticReconnect(true);
+        connOpts.setMaxInflight(10000);
+        connOpts.setUserName(connectorConfig.getMqttAccessKey());
+        connOpts.setPassword(HmacSHA1Util.macSignature(clientId, connectorConfig.getMqttSecretKey()).toCharArray());
+        return connOpts;
+    }
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/Utils.java b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/Utils.java
new file mode 100644
index 00000000..e3665618
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/util/Utils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utils {
+    private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+    public static String createClientId(String prefix) {
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceConnectorTest.java b/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceConnectorTest.java
new file mode 100644
index 00000000..7514657d
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceConnectorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.connector.MqttSourceConnector;
+import org.apache.rocketmq.connect.mqtt.connector.MqttSourceTask;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MqttSourceConnectorTest {
+
+    MqttSourceConnector connector = new MqttSourceConnector();
+
+    @Test
+    public void taskClassTest() {
+        assertEquals(connector.taskClass(), MqttSourceTask.class);
+    }
+
+    @Test
+    public void taskConfigsTest() {
+        assertEquals(connector.taskConfigs(2).get(0), null);
+        KeyValue keyValue = new DefaultKeyValue();
+        for (String requestKey : SourceConnectorConfig.REQUEST_CONFIG) {
+            keyValue.put(requestKey, requestKey);
+        }
+        connector.start(keyValue);
+        assertEquals(connector.taskConfigs(2).get(0), keyValue);
+    }
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceTaskTest.java b/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceTaskTest.java
new file mode 100644
index 00000000..50d11c46
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/test/java/connector/MqttSourceTaskTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.connector.MqttSourceTask;
+import org.apache.rocketmq.connect.mqtt.source.Replicator;
+import org.apache.rocketmq.connect.mqtt.util.HmacSHA1Util;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class MqttSourceTaskTest {
+    private MqttClient mqttClient = null;
+
+    @Before
+    public void before() throws MqttException, NoSuchAlgorithmException, InvalidKeyException {
+        MemoryPersistence memoryPersistence = new MemoryPersistence();
+        String brokerUrl = "tcp://100.76.11.96:1883";
+        String sendClientId = "send01";
+        MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(sendClientId);
+        mqttClient = new MqttClient(brokerUrl, sendClientId, memoryPersistence);
+        mqttClient.setTimeToWait(5000L);
+        mqttClient.setCallback(new MqttCallbackExtended() {
+            @Override
+            public void connectComplete(boolean reconnect, String serverURI) {
+                System.out.println(sendClientId + " connect success to " + serverURI);
+            }
+
+            @Override
+            public void connectionLost(Throwable throwable) {
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void messageArrived(String topic, MqttMessage mqttMessage) {
+            }
+
+            @Override
+            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+            }
+        });
+        try {
+            mqttClient.connect(mqttConnectOptions);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void test() throws MqttException {
+        KeyValue kv = new DefaultKeyValue();
+        kv.put("mqttBrokerUrl", "tcp://100.76.11.96:1883");
+        kv.put("mqttAccessKey", "rocketmq");
+        kv.put("mqttSecretKey", "12345678");
+        kv.put("sourceTopic", "topic_producer");
+        MqttSourceTask task = new MqttSourceTask();
+        task.start(kv);
+        for (int i = 0; i < 5; ) {
+            Collection<ConnectRecord> sourceDataEntry = task.poll();
+            String msg = "r1_" + System.currentTimeMillis() + "_" + i;
+            MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
+            message.setQos(1);
+            String mqttSendTopic = "topic_producer";
+            mqttClient.publish(mqttSendTopic, message);
+            System.out.println("send: " + mqttSendTopic + ", " + msg);
+            i = i + sourceDataEntry.size();
+            System.out.println(sourceDataEntry);
+        }
+    }
+
+    @Test
+    public void pollTest() throws Exception {
+        MqttSourceTask task = new MqttSourceTask();
+        MqttMessage textMessage = new MqttMessage();
+        textMessage.setPayload("hello rocketmq".getBytes(StandardCharsets.UTF_8));
+
+        Replicator replicatorObject = Mockito.mock(Replicator.class);
+        BlockingQueue<MqttMessage> queue = new LinkedBlockingQueue<>();
+        Mockito.when(replicatorObject.getQueue()).thenReturn(queue);
+
+        Field replicator = MqttSourceTask.class.getDeclaredField("replicator");
+        replicator.setAccessible(true);
+        replicator.set(task, replicatorObject);
+
+        Field config = MqttSourceTask.class.getDeclaredField("sourceConnectConfig");
+        config.setAccessible(true);
+        config.set(task, new SourceConnectorConfig());
+
+        queue.put(textMessage);
+        Collection<ConnectRecord> list = task.poll();
+        Assert.assertEquals(list.size(), 1);
+
+        list = task.poll();
+        Assert.assertEquals(list.size(), 0);
+
+    }
+
+    @After
+    public void after() throws MqttException {
+        mqttClient.disconnect();
+    }
+
+    private static MqttConnectOptions buildMqttConnectOptions(
+        String clientId) throws NoSuchAlgorithmException, InvalidKeyException {
+        MqttConnectOptions connOpts = new MqttConnectOptions();
+        connOpts.setCleanSession(true);
+        connOpts.setKeepAliveInterval(60);
+        connOpts.setAutomaticReconnect(true);
+        connOpts.setMaxInflight(10000);
+        connOpts.setUserName("rocketmq");
+        connOpts.setPassword(HmacSHA1Util.macSignature(clientId, "12345678").toCharArray());
+        return connOpts;
+    }
+
+}
diff --git a/connectors/rocketmq-connect-mqtt/src/test/java/source/ReplicatorTest.java b/connectors/rocketmq-connect-mqtt/src/test/java/source/ReplicatorTest.java
new file mode 100644
index 00000000..6f8661df
--- /dev/null
+++ b/connectors/rocketmq-connect-mqtt/src/test/java/source/ReplicatorTest.java
@@ -0,0 +1,57 @@
+package source;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.source.Replicator;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicatorTest {
+
+    Replicator replicator;
+    SourceConnectorConfig config;
+
+    @Before
+    public void before() throws Exception {
+        config = new SourceConnectorConfig();
+        config.setMqttBrokerUrl("tcp://100.76.11.96:1883");
+        config.setMqttAccessKey("rocketmq");
+        config.setMqttSecretKey("12345678");
+        config.setSourceTopic("topic_producer");
+        replicator = new Replicator(config);
+        replicator.start();
+    }
+
+    @Test
+    public void stop() throws Exception {
+        replicator.stop();
+    }
+
+    @Test
+    public void commitAddGetQueueTest() {
+        MqttMessage message = new MqttMessage();
+        replicator.commit(message, false);
+        Assert.assertEquals(replicator.getQueue().poll(), message);
+    }
+
+    @Test
+    public void getConfigTest() {
+        Assert.assertEquals(replicator.getConfig(), config);
+    }
+}