You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:55 UTC

[rocketmq-connect] 02/06: [ISSUE #302] Implement rocketmq connect jms (#303)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 78cc90208215b2f9a9998149be968a7b76575d36
Author: githublaohu <23...@qq.com>
AuthorDate: Fri Jun 21 15:38:21 2019 +0800

    [ISSUE #302] Implement rocketmq connect jms (#303)
---
 README-CN.md                                       |  16 ++
 README.md                                          |  15 ++
 pom.xml                                            | 201 +++++++++++++++++++++
 .../org/apache/rocketmq/connect/jms/Config.java    | 165 +++++++++++++++++
 .../org/apache/rocketmq/connect/jms/ErrorCode.java |   8 +
 .../apache/rocketmq/connect/jms/Replicator.java    |  67 +++++++
 .../jms/connector/BaseJmsSourceConnector.java      |  73 ++++++++
 .../connect/jms/connector/BaseJmsSourceTask.java   | 148 +++++++++++++++
 .../connect/jms/pattern/PatternProcessor.java      |  90 +++++++++
 .../rocketmq/connect/jms/ReplicatorTest.java       |  74 ++++++++
 .../jms/connector/ActivemqSourceTaskTest.java      | 164 +++++++++++++++++
 .../jms/connector/BaseJmsSourceConnectorTest.java  |  82 +++++++++
 12 files changed, 1103 insertions(+)

diff --git a/README-CN.md b/README-CN.md
new file mode 100644
index 0000000..be03683
--- /dev/null
+++ b/README-CN.md
@@ -0,0 +1,16 @@
+##### ActiveConnector完全限定名
+org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
+
+
+##### 配置参数
+
+参数 | 作用 | 是否必填 | 默认值
+---|--- |--- | ---
+activemq.url | activemq ip与端口号 | 是 | 无
+activemq.username | 用户名 | 否 |  无
+activemq.password|  密码    | 否  | 无
+jms.destination.name | 读取的队列或者主题名   |  是 | 无
+jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
+jms.message.selector | 过滤器    |  否  |无
+jms.session.acknowledge.mode | 消息确认  | 否 | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | 是否是事务会话      | 否 | false
diff --git a/README.md b/README.md
index 8b13789..e15149e 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,16 @@
+##### ActiveConnector fully-qualified name
+org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
 
+
+##### parameter configuration
+
+parameter | effect | required |default
+---|--- |--- | ---
+activemq.url | The URL of the ActiveMQ broker | yes | null
+activemq.username | The username to use when connecting to ActiveMQ | no |  null
+activemq.password|  The password to use when connecting to ActiveMQ    | no  | null
+jms.destination.name | The name of the JMS destination (queue or topic) to read from   |  yes | null
+jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null
+jms.message.selector | The message selector that should be applied to messages in the destination    |  no  | null 
+jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session  | null | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session      | null | false
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c7c3ba2
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-jms</artifactId>
+    <version>1.0.0</version>
+
+    <name>connect-jms</name>
+    <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jms</url>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.0-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.51</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-all</artifactId>
+            <version>5.9.0</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.jms</groupId>
+            <artifactId>javax.jms-api</artifactId>
+            <version>2.0</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Config.java b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
new file mode 100644
index 0000000..e9cd178
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+import javax.jms.Session;
+
+public class Config {
+
+    @SuppressWarnings("serial")
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("brokerUrl");
+            add("destinationType");
+            add("destinationName");
+        }
+    };
+
+    public String brokerUrl;
+
+    public String username;
+
+    public String password;
+
+    public String destinationType;
+
+    public String destinationName;
+
+    public String messageSelector;
+
+    private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
+    private Boolean sessionTransacted = Boolean.FALSE;
+
+    public void load(KeyValue props) {
+
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    
+    
+    public String getBrokerUrl() {
+		return brokerUrl;
+	}
+
+	public void setBrokerUrl(String brokerUrl) {
+		this.brokerUrl = brokerUrl;
+	}
+
+	public String getUsername() {
+		return username;
+	}
+
+	public void setUsername(String username) {
+		this.username = username;
+	}
+
+	public String getPassword() {
+		return password;
+	}
+
+	public void setPassword(String password) {
+		this.password = password;
+	}
+
+	public String getDestinationType() {
+        return destinationType;
+    }
+
+    public void setDestinationType(String destinationType) {
+        this.destinationType = destinationType;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public void setDestinationName(String destinationName) {
+        this.destinationName = destinationName;
+    }
+
+    public String getMessageSelector() {
+        return messageSelector;
+    }
+
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    public Integer getSessionAcknowledgeMode() {
+        return sessionAcknowledgeMode;
+    }
+
+    public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
+        this.sessionAcknowledgeMode = sessionAcknowledgeMode;
+    }
+
+    public Boolean getSessionTransacted() {
+        return sessionTransacted;
+    }
+
+    public void setSessionTransacted(Boolean sessionTransacted) {
+        this.sessionTransacted = sessionTransacted;
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java b/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java
new file mode 100644
index 0000000..b6dafb1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java
@@ -0,0 +1,8 @@
+package org.apache.rocketmq.connect.jms;
+
+public class ErrorCode {
+
+    public static final int START_ERROR_CODE = 10001;
+
+    public static final int STOP_ERROR_CODE = 10002;
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java b/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java
new file mode 100644
index 0000000..3e859a4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.Message;
+
+import org.apache.rocketmq.connect.jms.connector.BaseJmsSourceTask;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
+
+    private PatternProcessor processor;
+
+    private Config config;
+    private BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+    
+    private BaseJmsSourceTask baseJmsSourceTask;
+
+    public Replicator(Config config , BaseJmsSourceTask baseJmsSourceTask) {
+        this.config = config;
+        this.baseJmsSourceTask = baseJmsSourceTask;
+    }
+
+    public void start() throws Exception {
+        processor = baseJmsSourceTask.getPatternProcessor(this);
+        processor.start();
+        LOGGER.info("Replicator start succeed");
+    }
+
+    public void stop() throws Exception {
+        processor.stop();
+    }
+
+    public void commit(Message message, boolean isComplete) {
+        queue.add(message);
+    }
+
+    public Config getConfig() {
+        return this.config;
+    }
+
+    public BlockingQueue<Message> getQueue() {
+        return queue;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
new file mode 100644
index 0000000..f939881
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+
+public abstract class BaseJmsSourceConnector extends SourceConnector {
+
+    private KeyValue config;
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+    	
+        for (String requestKey : getRequiredConfig()) {
+            if (!config.containsKey(requestKey)) {
+                return "Request config key: " + requestKey;
+            }
+        }
+        this.config = config;
+        return "";
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+
+    @Override
+    public abstract Class<? extends Task> taskClass();
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        List<KeyValue> config = new ArrayList<>();
+        config.add(this.config);
+        return config;
+    }
+    
+    abstract Set<String> getRequiredConfig();
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
new file mode 100644
index 0000000..f43e7fb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.ErrorCode;
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.exception.DataConnectException;
+import io.openmessaging.connector.api.source.SourceTask;
+
+public abstract class BaseJmsSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(BaseJmsSourceTask.class);
+
+    private Replicator replicator;
+
+    private Config config;
+
+    private ByteBuffer sourcePartition;
+
+    @Override
+    public Collection<SourceDataEntry> poll() {
+        List<SourceDataEntry> res = new ArrayList<>();
+        try {
+            Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+            if (message != null) {
+                Object[] payload = new Object[] {config.getDestinationType(), config.getDestinationName(), getMessageContent(message)};
+                SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, null, System.currentTimeMillis(), EntryType.CREATE, null, null, payload);
+                res.add(sourceDataEntry);
+            }
+        } catch (Exception e) {
+            log.error("activemq task poll error, current config:" + JSON.toJSONString(config), e);
+        }
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue props) {
+        try {
+            this.config = new Config();
+            this.config.load(props);
+            this.sourcePartition = ByteBuffer.wrap(config.getBrokerUrl().getBytes("UTF-8"));
+            this.replicator = new Replicator(config,this);
+            this.replicator.start();
+        } catch (Exception e) {
+            log.error("activemq task start failed.", e);
+            throw new DataConnectException(ErrorCode.START_ERROR_CODE, e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            replicator.stop();
+        } catch (Exception e) {
+            log.error("activemq task stop failed.", e);
+            throw new DataConnectException(ErrorCode.STOP_ERROR_CODE, e.getMessage(), e);
+        }
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+
+    @SuppressWarnings("unchecked")
+    public ByteBuffer getMessageContent(Message message) throws JMSException {
+        byte[] data = null;
+        if (message instanceof TextMessage) {
+            data = ((TextMessage) message).getText().getBytes();
+        } else if (message instanceof ObjectMessage) {
+            data = JSON.toJSONBytes(((ObjectMessage) message).getObject());
+        } else if (message instanceof BytesMessage) {
+            BytesMessage bytesMessage = (BytesMessage) message;
+            data = new byte[(int) bytesMessage.getBodyLength()];
+            bytesMessage.readBytes(data);
+        } else if (message instanceof MapMessage) {
+            MapMessage mapMessage = (MapMessage) message;
+            Map<String, Object> map = new HashMap<>();
+            Enumeration<Object> names = mapMessage.getMapNames();
+            while (names.hasMoreElements()) {
+                String name = names.nextElement().toString();
+                map.put(name, mapMessage.getObject(name));
+            }
+            data = JSON.toJSONBytes(map);
+        } else if (message instanceof StreamMessage) {
+            StreamMessage streamMessage = (StreamMessage) message;
+            ByteArrayOutputStream bis = new ByteArrayOutputStream();
+            byte[] by = new byte[1024];
+            int i = 0;
+            while ((i = streamMessage.readBytes(by)) != -1) {
+                bis.write(by, 0, i);
+            }
+            data = bis.toByteArray();
+        } else {
+            // The exception is printed and does not need to be written as a DataConnectException
+            throw new RuntimeException("message type exception");
+        }
+        return ByteBuffer.wrap(data);
+    }
+    
+    public abstract PatternProcessor getPatternProcessor(Replicator replicator);
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
new file mode 100644
index 0000000..e03691f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.pattern;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.Replicator;
+
+public abstract class PatternProcessor {
+
+    private Replicator replicator;
+
+    Config config;
+
+    private Connection connection;
+
+    private Session session;
+
+    private MessageConsumer consumer;
+
+    public PatternProcessor(Replicator replicator) {
+        this.replicator = replicator;
+        this.config = replicator.getConfig();
+    }
+
+    public abstract ConnectionFactory connectionFactory();
+    
+    public void start() throws Exception {
+        if (!StringUtils.equals("topic", config.getDestinationType())
+            && !StringUtils.equals("queue", config.getDestinationType())) {
+            // RuntimeException is caught by DataConnectException
+            throw new RuntimeException("destination type is incorrectness");
+        }
+
+        ConnectionFactory connectionFactory = connectionFactory();
+
+        if (StringUtils.isNotBlank(config.getUsername())
+            && StringUtils.isNotBlank(config.getPassword())) {
+            connection = connectionFactory.createConnection(config.getUsername(), config.getPassword());
+        } else {
+            connection = connectionFactory.createConnection();
+        }
+        connection.start();
+        Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
+        Destination destination = null;
+        if (StringUtils.equals("topic", config.getDestinationType())) {
+            destination = session.createTopic(config.getDestinationName());
+        } else if (StringUtils.equals("queue", config.getDestinationType())) {
+            destination = session.createQueue(config.getDestinationName());
+        }
+        consumer = session.createConsumer(destination, config.getMessageSelector());
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                replicator.commit(message, true);
+            }
+        });
+
+    }
+
+    public void stop() throws Exception {
+        consumer.close();
+        session.close();
+        connection.close();
+    }
+
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
new file mode 100644
index 0000000..32b8f04
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import java.lang.reflect.Field;
+
+import javax.jms.Message;
+
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.junit.Assert;
+
+public class ReplicatorTest {
+
+    Replicator replicator;
+
+    PatternProcessor patternProcessor;
+
+    Config config;
+
+    @Before
+    public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
+        config = new Config();
+        replicator = new Replicator(config,null);
+
+        patternProcessor = Mockito.mock(PatternProcessor.class);
+
+        Field processor = Replicator.class.getDeclaredField("processor");
+        processor.setAccessible(true);
+        processor.set(replicator, patternProcessor);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void startTest() throws Exception {
+        replicator.start();
+    }
+
+    @Test
+    public void stop() throws Exception {
+        replicator.stop();
+        Mockito.verify(patternProcessor, Mockito.times(1)).stop();
+    }
+
+    @Test
+    public void commitAddGetQueueTest() {
+        Message message = new ActiveMQTextMessage();
+        replicator.commit(message, false);
+        Assert.assertEquals(replicator.getQueue().poll(), message);
+    }
+
+    @Test
+    public void getConfigTest() {
+        Assert.assertEquals(replicator.getConfig(), config);
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
new file mode 100644
index 0000000..72e818a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.alibaba.fastjson.JSON;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class ActivemqSourceTaskTest {
+
+    public void befores() throws JMSException, InterruptedException {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
+        Connection connection = connectionFactory.createConnection();
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("test-queue");
+
+        MessageProducer producer = session.createProducer(destination);
+
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        for (int i = 0; i < 20; i++) {
+            TextMessage message = session.createTextMessage("hello 我是消息:" + i);
+            producer.send(message);
+        }
+
+        session.commit();
+        session.close();
+        connection.close();
+    }
+
+    //@Test
+    public void test() throws InterruptedException {
+        KeyValue kv = new DefaultKeyValue();
+        kv.put("activemqUrl", "tcp://112.74.48.251:6166");
+        kv.put("destinationType", "queue");
+        kv.put("destinationName", "test-queue");
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        task.start(kv);
+        for (int i = 0; i < 20; ) {
+            Collection<SourceDataEntry> sourceDataEntry = task.poll();
+            i = i + sourceDataEntry.size();
+            System.out.println(sourceDataEntry);
+        }
+        Thread.sleep(20000);
+    }
+
+    @Test
+    public void pollTest() throws Exception {
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        TextMessage textMessage = new ActiveMQTextMessage();
+        textMessage.setText("hello rocketmq");
+
+        Replicator replicatorObject = Mockito.mock(Replicator.class);
+        BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+        Mockito.when(replicatorObject.getQueue()).thenReturn(queue);
+
+        Field replicator = ActivemqSourceTask.class.getDeclaredField("replicator");
+        replicator.setAccessible(true);
+        replicator.set(task, replicatorObject);
+
+        Field config = ActivemqSourceTask.class.getDeclaredField("config");
+        config.setAccessible(true);
+        config.set(task, new Config());
+
+        queue.put(textMessage);
+        Collection<SourceDataEntry> list = task.poll();
+        Assert.assertEquals(list.size(), 1);
+
+        list = task.poll();
+        Assert.assertEquals(list.size(), 0);
+
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void getMessageConnentTest() throws JMSException {
+        String value = "hello rocketmq";
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        TextMessage textMessage = new ActiveMQTextMessage();
+        textMessage.setText(value);
+        ByteBuffer buffer = task.getMessageConnent(textMessage);
+        Assert.assertEquals(new String(buffer.array()), textMessage.getText());
+
+        ObjectMessage objectMessage = new ActiveMQObjectMessage();
+        objectMessage.setObject(value);
+        buffer = task.getMessageConnent(objectMessage);
+        Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\"");
+
+        BytesMessage bytes = new ActiveMQBytesMessage();
+        bytes.writeBytes(value.getBytes());
+        bytes.reset();
+        buffer = task.getMessageConnent(bytes);
+        Assert.assertEquals(new String(buffer.array()), value);
+
+        MapMessage mapMessage = new ActiveMQMapMessage();
+        mapMessage.setString("hello", "rocketmq");
+        buffer = task.getMessageConnent(mapMessage);
+        Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
+        Assert.assertEquals(map.get("hello"), "rocketmq");
+        Assert.assertEquals(map.size(), 1);
+
+        StreamMessage streamMessage = new ActiveMQStreamMessage();
+        String valueTwo = null;
+        for (int i = 0; i < 200; i++) {
+            valueTwo = valueTwo + value;
+        }
+        streamMessage.writeBytes(valueTwo.getBytes());
+        streamMessage.reset();
+        buffer = task.getMessageConnent(streamMessage);
+        Assert.assertEquals(new String(buffer.array()), valueTwo);
+
+        task.getMessageConnent(null);
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
new file mode 100644
index 0000000..6c4029a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.rocketmq.connect.jms.Config;
+import org.junit.Test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class BaseJmsSourceConnectorTest {
+
+	public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("activemqUrl");
+            add("destinationType");
+            add("destinationName");
+        }
+    };
+	
+	BaseJmsSourceConnector connector = new BaseJmsSourceConnector() {
+	    
+		
+		@Override
+		public Class<? extends Task> taskClass() {
+			return BaseJmsSourceTask.class;
+		}
+		
+		@Override
+		Set<String> getRequiredConfig() {
+			return REQUEST_CONFIG;
+		}
+	};
+
+    @Test
+    public void verifyAndSetConfigTest() {
+        KeyValue keyValue = new DefaultKeyValue();
+
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+            keyValue.put(requestKey, requestKey);
+        }
+        assertEquals(connector.verifyAndSetConfig(keyValue), "");
+    }
+
+    @Test
+    public void taskClassTest() {
+        assertEquals(connector.taskClass(), BaseJmsSourceTask.class);
+    }
+
+    @Test
+    public void taskConfigsTest() {
+        assertEquals(connector.taskConfigs().get(0), null);
+        KeyValue keyValue = new DefaultKeyValue();
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            keyValue.put(requestKey, requestKey);
+        }
+        connector.verifyAndSetConfig(keyValue);
+        assertEquals(connector.taskConfigs().get(0), keyValue);
+    }
+}