You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:55 UTC
[rocketmq-connect] 02/06: [ISSUE #302] Implement rocketmq connect jms (#303)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 78cc90208215b2f9a9998149be968a7b76575d36
Author: githublaohu <23...@qq.com>
AuthorDate: Fri Jun 21 15:38:21 2019 +0800
[ISSUE #302] Implement rocketmq connect jms (#303)
---
README-CN.md | 16 ++
README.md | 15 ++
pom.xml | 201 +++++++++++++++++++++
.../org/apache/rocketmq/connect/jms/Config.java | 165 +++++++++++++++++
.../org/apache/rocketmq/connect/jms/ErrorCode.java | 8 +
.../apache/rocketmq/connect/jms/Replicator.java | 67 +++++++
.../jms/connector/BaseJmsSourceConnector.java | 73 ++++++++
.../connect/jms/connector/BaseJmsSourceTask.java | 148 +++++++++++++++
.../connect/jms/pattern/PatternProcessor.java | 90 +++++++++
.../rocketmq/connect/jms/ReplicatorTest.java | 74 ++++++++
.../jms/connector/ActivemqSourceTaskTest.java | 164 +++++++++++++++++
.../jms/connector/BaseJmsSourceConnectorTest.java | 82 +++++++++
12 files changed, 1103 insertions(+)
diff --git a/README-CN.md b/README-CN.md
new file mode 100644
index 0000000..be03683
--- /dev/null
+++ b/README-CN.md
@@ -0,0 +1,16 @@
+##### ActiveConnector完全限定名
+org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
+
+
+##### 配置参数
+
+参数 | 作用 | 是否必填 | 默认值
+---|--- |--- | ---
+activemq.url | activemq ip与端口号 | 是 | 无
+activemq.username | 用户名 | 否 | 无
+activemq.password| 密码 | 否 | 无
+jms.destination.name | 读取的队列或者主题名 | 是 | 无
+jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
+jms.message.selector | 过滤器 | 否 |无
+jms.session.acknowledge.mode | 消息确认 | 否 | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | 是否是事务会话 | 否 | false
diff --git a/README.md b/README.md
index 8b13789..e15149e 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,16 @@
+##### ActiveConnector fully-qualified name
+org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
+
+##### parameter configuration
+
+parameter | effect | required |default
+---|--- |--- | ---
+activemq.url | The URL of the ActiveMQ broker | yes | null
+activemq.username | The username to use when connecting to ActiveMQ | no | null
+activemq.password| The password to use when connecting to ActiveMQ | no | null
+jms.destination.name | The name of the JMS destination (queue or topic) to read from | yes | null
+jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null
+jms.message.selector | The message selector that should be applied to messages in the destination | no | null
+jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session | null | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session | null | false
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c7c3ba2
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-jms</artifactId>
+ <version>1.0.0</version>
+
+ <name>connect-jms</name>
+ <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jms</url>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <issueManagement>
+ <system>jira</system>
+ <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+ </issueManagement>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+ <!-- Compiler settings properties -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>2.6.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.6.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.0-beta</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.51</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.13</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.0.13</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ <version>4.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-all</artifactId>
+ <version>5.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.jms</groupId>
+ <artifactId>javax.jms-api</artifactId>
+ <version>2.0</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Config.java b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
new file mode 100644
index 0000000..e9cd178
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+import javax.jms.Session;
+
+public class Config {
+
+ @SuppressWarnings("serial")
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+ {
+ add("brokerUrl");
+ add("destinationType");
+ add("destinationName");
+ }
+ };
+
+ public String brokerUrl;
+
+ public String username;
+
+ public String password;
+
+ public String destinationType;
+
+ public String destinationName;
+
+ public String messageSelector;
+
+ private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
+ private Boolean sessionTransacted = Boolean.FALSE;
+
+ public void load(KeyValue props) {
+
+ properties2Object(props, this);
+ }
+
+ private void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+
+
+ public String getBrokerUrl() {
+ return brokerUrl;
+ }
+
+ public void setBrokerUrl(String brokerUrl) {
+ this.brokerUrl = brokerUrl;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getDestinationType() {
+ return destinationType;
+ }
+
+ public void setDestinationType(String destinationType) {
+ this.destinationType = destinationType;
+ }
+
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ public void setDestinationName(String destinationName) {
+ this.destinationName = destinationName;
+ }
+
+ public String getMessageSelector() {
+ return messageSelector;
+ }
+
+ public void setMessageSelector(String messageSelector) {
+ this.messageSelector = messageSelector;
+ }
+
+ public Integer getSessionAcknowledgeMode() {
+ return sessionAcknowledgeMode;
+ }
+
+ public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
+ this.sessionAcknowledgeMode = sessionAcknowledgeMode;
+ }
+
+ public Boolean getSessionTransacted() {
+ return sessionTransacted;
+ }
+
+ public void setSessionTransacted(Boolean sessionTransacted) {
+ this.sessionTransacted = sessionTransacted;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java b/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java
new file mode 100644
index 0000000..b6dafb1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java
@@ -0,0 +1,8 @@
+package org.apache.rocketmq.connect.jms;
+
+public class ErrorCode {
+
+ public static final int START_ERROR_CODE = 10001;
+
+ public static final int STOP_ERROR_CODE = 10002;
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java b/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java
new file mode 100644
index 0000000..3e859a4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.Message;
+
+import org.apache.rocketmq.connect.jms.connector.BaseJmsSourceTask;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
+
+ private PatternProcessor processor;
+
+ private Config config;
+ private BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+
+ private BaseJmsSourceTask baseJmsSourceTask;
+
+ public Replicator(Config config , BaseJmsSourceTask baseJmsSourceTask) {
+ this.config = config;
+ this.baseJmsSourceTask = baseJmsSourceTask;
+ }
+
+ public void start() throws Exception {
+ processor = baseJmsSourceTask.getPatternProcessor(this);
+ processor.start();
+ LOGGER.info("Replicator start succeed");
+ }
+
+ public void stop() throws Exception {
+ processor.stop();
+ }
+
+ public void commit(Message message, boolean isComplete) {
+ queue.add(message);
+ }
+
+ public Config getConfig() {
+ return this.config;
+ }
+
+ public BlockingQueue<Message> getQueue() {
+ return queue;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
new file mode 100644
index 0000000..f939881
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+
+public abstract class BaseJmsSourceConnector extends SourceConnector {
+
+ private KeyValue config;
+
+ @Override
+ public String verifyAndSetConfig(KeyValue config) {
+
+ for (String requestKey : getRequiredConfig()) {
+ if (!config.containsKey(requestKey)) {
+ return "Request config key: " + requestKey;
+ }
+ }
+ this.config = config;
+ return "";
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override public void pause() {
+
+ }
+
+ @Override public void resume() {
+
+ }
+
+ @Override
+ public abstract Class<? extends Task> taskClass();
+
+ @Override
+ public List<KeyValue> taskConfigs() {
+ List<KeyValue> config = new ArrayList<>();
+ config.add(this.config);
+ return config;
+ }
+
+ abstract Set<String> getRequiredConfig();
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
new file mode 100644
index 0000000..f43e7fb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.ErrorCode;
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.exception.DataConnectException;
+import io.openmessaging.connector.api.source.SourceTask;
+
+public abstract class BaseJmsSourceTask extends SourceTask {
+
+ private static final Logger log = LoggerFactory.getLogger(BaseJmsSourceTask.class);
+
+ private Replicator replicator;
+
+ private Config config;
+
+ private ByteBuffer sourcePartition;
+
+ @Override
+ public Collection<SourceDataEntry> poll() {
+ List<SourceDataEntry> res = new ArrayList<>();
+ try {
+ Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+ if (message != null) {
+ Object[] payload = new Object[] {config.getDestinationType(), config.getDestinationName(), getMessageContent(message)};
+ SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, null, System.currentTimeMillis(), EntryType.CREATE, null, null, payload);
+ res.add(sourceDataEntry);
+ }
+ } catch (Exception e) {
+ log.error("activemq task poll error, current config:" + JSON.toJSONString(config), e);
+ }
+ return res;
+ }
+
+ @Override
+ public void start(KeyValue props) {
+ try {
+ this.config = new Config();
+ this.config.load(props);
+ this.sourcePartition = ByteBuffer.wrap(config.getBrokerUrl().getBytes("UTF-8"));
+ this.replicator = new Replicator(config,this);
+ this.replicator.start();
+ } catch (Exception e) {
+ log.error("activemq task start failed.", e);
+ throw new DataConnectException(ErrorCode.START_ERROR_CODE, e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ replicator.stop();
+ } catch (Exception e) {
+ log.error("activemq task stop failed.", e);
+ throw new DataConnectException(ErrorCode.STOP_ERROR_CODE, e.getMessage(), e);
+ }
+ }
+
+ @Override public void pause() {
+
+ }
+
+ @Override public void resume() {
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public ByteBuffer getMessageContent(Message message) throws JMSException {
+ byte[] data = null;
+ if (message instanceof TextMessage) {
+ data = ((TextMessage) message).getText().getBytes();
+ } else if (message instanceof ObjectMessage) {
+ data = JSON.toJSONBytes(((ObjectMessage) message).getObject());
+ } else if (message instanceof BytesMessage) {
+ BytesMessage bytesMessage = (BytesMessage) message;
+ data = new byte[(int) bytesMessage.getBodyLength()];
+ bytesMessage.readBytes(data);
+ } else if (message instanceof MapMessage) {
+ MapMessage mapMessage = (MapMessage) message;
+ Map<String, Object> map = new HashMap<>();
+ Enumeration<Object> names = mapMessage.getMapNames();
+ while (names.hasMoreElements()) {
+ String name = names.nextElement().toString();
+ map.put(name, mapMessage.getObject(name));
+ }
+ data = JSON.toJSONBytes(map);
+ } else if (message instanceof StreamMessage) {
+ StreamMessage streamMessage = (StreamMessage) message;
+ ByteArrayOutputStream bis = new ByteArrayOutputStream();
+ byte[] by = new byte[1024];
+ int i = 0;
+ while ((i = streamMessage.readBytes(by)) != -1) {
+ bis.write(by, 0, i);
+ }
+ data = bis.toByteArray();
+ } else {
+ // The exception is printed and does not need to be written as a DataConnectException
+ throw new RuntimeException("message type exception");
+ }
+ return ByteBuffer.wrap(data);
+ }
+
+ public abstract PatternProcessor getPatternProcessor(Replicator replicator);
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
new file mode 100644
index 0000000..e03691f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.pattern;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.Replicator;
+
+public abstract class PatternProcessor {
+
+ private Replicator replicator;
+
+ Config config;
+
+ private Connection connection;
+
+ private Session session;
+
+ private MessageConsumer consumer;
+
+ public PatternProcessor(Replicator replicator) {
+ this.replicator = replicator;
+ this.config = replicator.getConfig();
+ }
+
+ public abstract ConnectionFactory connectionFactory();
+
+ public void start() throws Exception {
+ if (!StringUtils.equals("topic", config.getDestinationType())
+ && !StringUtils.equals("queue", config.getDestinationType())) {
+ // RuntimeException is caught by DataConnectException
+ throw new RuntimeException("destination type is incorrectness");
+ }
+
+ ConnectionFactory connectionFactory = connectionFactory();
+
+ if (StringUtils.isNotBlank(config.getUsername())
+ && StringUtils.isNotBlank(config.getPassword())) {
+ connection = connectionFactory.createConnection(config.getUsername(), config.getPassword());
+ } else {
+ connection = connectionFactory.createConnection();
+ }
+ connection.start();
+ Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
+ Destination destination = null;
+ if (StringUtils.equals("topic", config.getDestinationType())) {
+ destination = session.createTopic(config.getDestinationName());
+ } else if (StringUtils.equals("queue", config.getDestinationType())) {
+ destination = session.createQueue(config.getDestinationName());
+ }
+ consumer = session.createConsumer(destination, config.getMessageSelector());
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ replicator.commit(message, true);
+ }
+ });
+
+ }
+
+ public void stop() throws Exception {
+ consumer.close();
+ session.close();
+ connection.close();
+ }
+
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
new file mode 100644
index 0000000..32b8f04
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import java.lang.reflect.Field;
+
+import javax.jms.Message;
+
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.junit.Assert;
+
+public class ReplicatorTest {
+
+ Replicator replicator;
+
+ PatternProcessor patternProcessor;
+
+ Config config;
+
+ @Before
+ public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
+ config = new Config();
+ replicator = new Replicator(config,null);
+
+ patternProcessor = Mockito.mock(PatternProcessor.class);
+
+ Field processor = Replicator.class.getDeclaredField("processor");
+ processor.setAccessible(true);
+ processor.set(replicator, patternProcessor);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void startTest() throws Exception {
+ replicator.start();
+ }
+
+ @Test
+ public void stop() throws Exception {
+ replicator.stop();
+ Mockito.verify(patternProcessor, Mockito.times(1)).stop();
+ }
+
+ @Test
+ public void commitAddGetQueueTest() {
+ Message message = new ActiveMQTextMessage();
+ replicator.commit(message, false);
+ Assert.assertEquals(replicator.getQueue().poll(), message);
+ }
+
+ @Test
+ public void getConfigTest() {
+ Assert.assertEquals(replicator.getConfig(), config);
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
new file mode 100644
index 0000000..72e818a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.alibaba.fastjson.JSON;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class ActivemqSourceTaskTest {
+
+ public void befores() throws JMSException, InterruptedException {
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
+ Connection connection = connectionFactory.createConnection();
+
+ connection.start();
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue("test-queue");
+
+ MessageProducer producer = session.createProducer(destination);
+
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ for (int i = 0; i < 20; i++) {
+ TextMessage message = session.createTextMessage("hello 我是消息:" + i);
+ producer.send(message);
+ }
+
+ session.commit();
+ session.close();
+ connection.close();
+ }
+
+ //@Test
+ public void test() throws InterruptedException {
+ KeyValue kv = new DefaultKeyValue();
+ kv.put("activemqUrl", "tcp://112.74.48.251:6166");
+ kv.put("destinationType", "queue");
+ kv.put("destinationName", "test-queue");
+ ActivemqSourceTask task = new ActivemqSourceTask();
+ task.start(kv);
+ for (int i = 0; i < 20; ) {
+ Collection<SourceDataEntry> sourceDataEntry = task.poll();
+ i = i + sourceDataEntry.size();
+ System.out.println(sourceDataEntry);
+ }
+ Thread.sleep(20000);
+ }
+
+ @Test
+ public void pollTest() throws Exception {
+ ActivemqSourceTask task = new ActivemqSourceTask();
+ TextMessage textMessage = new ActiveMQTextMessage();
+ textMessage.setText("hello rocketmq");
+
+ Replicator replicatorObject = Mockito.mock(Replicator.class);
+ BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+ Mockito.when(replicatorObject.getQueue()).thenReturn(queue);
+
+ Field replicator = ActivemqSourceTask.class.getDeclaredField("replicator");
+ replicator.setAccessible(true);
+ replicator.set(task, replicatorObject);
+
+ Field config = ActivemqSourceTask.class.getDeclaredField("config");
+ config.setAccessible(true);
+ config.set(task, new Config());
+
+ queue.put(textMessage);
+ Collection<SourceDataEntry> list = task.poll();
+ Assert.assertEquals(list.size(), 1);
+
+ list = task.poll();
+ Assert.assertEquals(list.size(), 0);
+
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void getMessageConnentTest() throws JMSException {
+ String value = "hello rocketmq";
+ ActivemqSourceTask task = new ActivemqSourceTask();
+ TextMessage textMessage = new ActiveMQTextMessage();
+ textMessage.setText(value);
+ ByteBuffer buffer = task.getMessageConnent(textMessage);
+ Assert.assertEquals(new String(buffer.array()), textMessage.getText());
+
+ ObjectMessage objectMessage = new ActiveMQObjectMessage();
+ objectMessage.setObject(value);
+ buffer = task.getMessageConnent(objectMessage);
+ Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\"");
+
+ BytesMessage bytes = new ActiveMQBytesMessage();
+ bytes.writeBytes(value.getBytes());
+ bytes.reset();
+ buffer = task.getMessageConnent(bytes);
+ Assert.assertEquals(new String(buffer.array()), value);
+
+ MapMessage mapMessage = new ActiveMQMapMessage();
+ mapMessage.setString("hello", "rocketmq");
+ buffer = task.getMessageConnent(mapMessage);
+ Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
+ Assert.assertEquals(map.get("hello"), "rocketmq");
+ Assert.assertEquals(map.size(), 1);
+
+ StreamMessage streamMessage = new ActiveMQStreamMessage();
+ String valueTwo = null;
+ for (int i = 0; i < 200; i++) {
+ valueTwo = valueTwo + value;
+ }
+ streamMessage.writeBytes(valueTwo.getBytes());
+ streamMessage.reset();
+ buffer = task.getMessageConnent(streamMessage);
+ Assert.assertEquals(new String(buffer.array()), valueTwo);
+
+ task.getMessageConnent(null);
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
new file mode 100644
index 0000000..6c4029a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.rocketmq.connect.jms.Config;
+import org.junit.Test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class BaseJmsSourceConnectorTest {
+
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+ {
+ add("activemqUrl");
+ add("destinationType");
+ add("destinationName");
+ }
+ };
+
+ BaseJmsSourceConnector connector = new BaseJmsSourceConnector() {
+
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return BaseJmsSourceTask.class;
+ }
+
+ @Override
+ Set<String> getRequiredConfig() {
+ return REQUEST_CONFIG;
+ }
+ };
+
+ @Test
+ public void verifyAndSetConfigTest() {
+ KeyValue keyValue = new DefaultKeyValue();
+
+ for (String requestKey : Config.REQUEST_CONFIG) {
+ assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+ keyValue.put(requestKey, requestKey);
+ }
+ assertEquals(connector.verifyAndSetConfig(keyValue), "");
+ }
+
+ @Test
+ public void taskClassTest() {
+ assertEquals(connector.taskClass(), BaseJmsSourceTask.class);
+ }
+
+ @Test
+ public void taskConfigsTest() {
+ assertEquals(connector.taskConfigs().get(0), null);
+ KeyValue keyValue = new DefaultKeyValue();
+ for (String requestKey : Config.REQUEST_CONFIG) {
+ keyValue.put(requestKey, requestKey);
+ }
+ connector.verifyAndSetConfig(keyValue);
+ assertEquals(connector.taskConfigs().get(0), keyValue);
+ }
+}