You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:15:35 UTC

[rocketmq-connect] branch master updated (a1e5f60 -> 0669f46)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git.


    from a1e5f60  Merge branch 'rocketmq_connect1' of https://github.com/apache/rocketmq-externals
     new f4221ce  (1)Rename rocketmq-connector to replicator (2) Initialize RocketMQ activemq connect and runtime
     new c7c0c04  init complete
     new 30b02f4  change package name
     new da0b21f  change
     new 6663768  change
     new d50ffb8  message type
     new 508eea1  add unit test
     new 23a5b24  add licenses
     new db391c5  change exception
     new 3da8a3f  format pom.xml
     new e8aedf3  Change README.md (#301)
     new 0669f46  Add 'connector/rocketmq-connect-activemq/' from commit 'e8aedf31c7eb4b26a2d9df7c766427fdfd845e9f'

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 connector/rocketmq-connect-activemq/README-CN.md   |  16 ++
 connector/rocketmq-connect-activemq/README.md      |  16 ++
 .../rocketmq-connect-activemq}/pom.xml             | 134 +++++++----------
 .../apache/rocketmq/connect/activemq/Config.java   | 109 +++++++++++++-
 .../rocketmq/connect/activemq/ErrorCode.java       |   8 +
 .../rocketmq/connect/activemq/Replicator.java      |  63 ++++++++
 .../connector/ActivemqSourceConnector.java         |  28 ++--
 .../activemq/connector/ActivemqSourceTask.java     | 141 ++++++++++++++++++
 .../connect/activemq/pattern/PatternProcessor.java |  89 +++++++++++
 .../rocketmq/connect/activemq/ReplicatorTest.java  |  74 +++++++++
 .../activemq/connector/ActivemqConnectorTest.java  |  58 ++++++++
 .../activemq/connector/ActivemqSourceTaskTest.java | 165 +++++++++++++++++++++
 12 files changed, 803 insertions(+), 98 deletions(-)
 create mode 100644 connector/rocketmq-connect-activemq/README-CN.md
 create mode 100644 connector/rocketmq-connect-activemq/README.md
 copy {rocketmq-connect-runtime => connector/rocketmq-connect-activemq}/pom.xml (62%)
 copy rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileUtils.java => connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java (52%)
 create mode 100644 connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java
 create mode 100644 connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
 copy rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceConnector.java => connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java (72%)
 create mode 100644 connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
 create mode 100644 connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
 create mode 100644 connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
 create mode 100644 connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
 create mode 100644 connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java

[rocketmq-connect] 02/12: init complete

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit c7c0c04a7fd28d1067ec2c8700d404ac8dd2f391
Author: laohu <23...@qq.com>
AuthorDate: Sun Jun 2 21:10:20 2019 +0800

    init complete
---
 pom.xml                                            | 259 +++++++++++++++++++++
 .../java/io/openmessaging/activemq/Config.java     | 133 +++++++++++
 .../java/io/openmessaging/activemq/Replicator.java |  72 ++++++
 .../activemq/connector/ActivemqConnector.java      |  72 ++++++
 .../activemq/connector/ActivemqTask.java           |  87 +++++++
 .../activemq/pattern/PatternProcessor.java         |  83 +++++++
 6 files changed, 706 insertions(+)

diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..b021330
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,259 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>org.apache.rocketmq</groupId>
+	<artifactId>rocketmq-connect-activemq</artifactId>
+	<version>1.0.0</version>
+
+	<name>connect-activemq</name>
+	<description>Redis Replicator is a redis RDB and Command parser written in java.
+        It can parse,filter,broadcast the RDB and Command events in a real time manner
+        and resent to Apache RocketMQ, then consumer could subscribe topic to receive data.
+    </description>
+	<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-activemq</url>
+
+	<licenses>
+		<license>
+			<name>The Apache Software License, Version 2.0</name>
+			<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+		</license>
+	</licenses>
+
+	<developers>
+		<developer>
+			<name>Leon Chen</name>
+			<email>chen.bao.yi@gmail.com</email>
+			<organization>moilioncircle</organization>
+			<organizationUrl>http://www.moilioncircle.com/</organizationUrl>
+			<roles>
+				<role>Developer</role>
+			</roles>
+			<timezone>+8</timezone>
+		</developer>
+
+		<developer>
+			<name>Adrian Yao</name>
+			<email>adrianyaofly@gmail.com</email>
+			<organization>unstudy</organization>
+			<timezone>+8</timezone>
+		</developer>
+
+		<developer>
+			<name>Rick Zhang</name>
+			<email>zhangke.huangshan@gmail.com</email>
+			<organization>treefinance.com.cn</organization>
+			<roles>
+				<role>Developer</role>
+			</roles>
+			<timezone>+8</timezone>
+		</developer>
+	</developers>
+
+	<issueManagement>
+		<system>jira</system>
+		<url>https://issues.apache.org/jira/browse/RocketMQ</url>
+	</issueManagement>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+		<!-- Compiler settings properties -->
+		<maven.compiler.source>1.8</maven.compiler.source>
+		<maven.compiler.target>1.8</maven.compiler.target>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>versions-maven-plugin</artifactId>
+				<version>2.3</version>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>clirr-maven-plugin</artifactId>
+				<version>2.7</version>
+			</plugin>
+			<plugin>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.6.1</version>
+				<configuration>
+					<source>${maven.compiler.source}</source>
+					<target>${maven.compiler.target}</target>
+					<compilerVersion>${maven.compiler.source}</compilerVersion>
+					<showDeprecation>true</showDeprecation>
+					<showWarnings>true</showWarnings>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.19.1</version>
+				<configuration>
+					<argLine>-Xms512m -Xmx1024m</argLine>
+					<forkMode>always</forkMode>
+					<includes>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-site-plugin</artifactId>
+				<version>3.6</version>
+				<configuration>
+					<locales>en_US</locales>
+					<outputEncoding>UTF-8</outputEncoding>
+					<inputEncoding>UTF-8</inputEncoding>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-source-plugin</artifactId>
+				<version>3.0.1</version>
+				<executions>
+					<execution>
+						<id>attach-sources</id>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<artifactId>maven-javadoc-plugin</artifactId>
+				<version>2.10.4</version>
+				<configuration>
+					<charset>UTF-8</charset>
+					<locale>en_US</locale>
+					<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+				</configuration>
+				<executions>
+					<execution>
+						<id>aggregate</id>
+						<goals>
+							<goal>aggregate</goal>
+						</goals>
+						<phase>site</phase>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<artifactId>maven-resources-plugin</artifactId>
+				<version>3.0.2</version>
+				<configuration>
+					<encoding>${project.build.sourceEncoding}</encoding>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>findbugs-maven-plugin</artifactId>
+				<version>3.0.4</version>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.11</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.assertj</groupId>
+			<artifactId>assertj-core</artifactId>
+			<version>2.6.0</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-core</artifactId>
+			<version>2.6.3</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpclient</artifactId>
+			<version>4.5.5</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>io.openmessaging</groupId>
+			<artifactId>openmessaging-connect-runtime</artifactId>
+			<version>0.0.1-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>io.openmessaging</groupId>
+			<artifactId>openmessaging-connector</artifactId>
+			<version>0.1.0-beta</version>
+		</dependency>
+		<dependency>
+			<groupId>io.openmessaging</groupId>
+			<artifactId>openmessaging-api</artifactId>
+			<version>0.3.1-alpha</version>
+		</dependency>
+		<dependency>
+			<groupId>com.alibaba</groupId>
+			<artifactId>fastjson</artifactId>
+			<version>1.2.51</version>
+		</dependency>
+		<!-- <dependency> <groupId>io.javalin</groupId> <artifactId>javalin</artifactId> 
+			<version>1.3.0</version> </dependency> -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>1.7.7</version>
+		</dependency>
+		<dependency>
+			<groupId>ch.qos.logback</groupId>
+			<artifactId>logback-classic</artifactId>
+			<version>1.0.13</version>
+		</dependency>
+		<dependency>
+			<groupId>ch.qos.logback</groupId>
+			<artifactId>logback-core</artifactId>
+			<version>1.0.13</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-openmessaging</artifactId>
+			<version>4.3.2</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-cli</groupId>
+			<artifactId>commons-cli</artifactId>
+			<version>1.2</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.activemq</groupId>
+			<artifactId>activemq-all</artifactId>
+			<version>5.9.0</version>
+		</dependency>
+		<dependency>
+			<groupId>javax.jms</groupId>
+			<artifactId>javax.jms-api</artifactId>
+			<version>2.0</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.glassfish.main.javaee-api</groupId>
+			<artifactId>javax.jms</artifactId>
+			<version>3.1.2.2</version>
+		</dependency>
+
+	</dependencies>
+
+</project>
diff --git a/src/main/java/io/openmessaging/activemq/Config.java b/src/main/java/io/openmessaging/activemq/Config.java
new file mode 100644
index 0000000..10a5d9f
--- /dev/null
+++ b/src/main/java/io/openmessaging/activemq/Config.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.activemq;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+import io.openmessaging.KeyValue;
+
+public class Config {
+
+	public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+		{
+			add("activemqUrl");
+			add("activemqUsername");
+			add("activemqPassword");
+			add("destinationType");
+			add("destinationName");
+		}
+	};
+
+	public String activemqUrl;
+
+	public String activemqUsername;
+
+	public String activemqPassword;
+
+	public String destinationType;
+
+	public String destinationName;
+
+	public void load(KeyValue props) {
+
+		properties2Object(props, this);
+	}
+
+	private void properties2Object(final KeyValue p, final Object object) {
+
+		Method[] methods = object.getClass().getMethods();
+		for (Method method : methods) {
+			String mn = method.getName();
+			if (mn.startsWith("set")) {
+				try {
+					String tmp = mn.substring(4);
+					String first = mn.substring(3, 4);
+
+					String key = first.toLowerCase() + tmp;
+					String property = p.getString(key);
+					if (property != null) {
+						Class<?>[] pt = method.getParameterTypes();
+						if (pt != null && pt.length > 0) {
+							String cn = pt[0].getSimpleName();
+							Object arg;
+							if (cn.equals("int") || cn.equals("Integer")) {
+								arg = Integer.parseInt(property);
+							} else if (cn.equals("long") || cn.equals("Long")) {
+								arg = Long.parseLong(property);
+							} else if (cn.equals("double") || cn.equals("Double")) {
+								arg = Double.parseDouble(property);
+							} else if (cn.equals("boolean") || cn.equals("Boolean")) {
+								arg = Boolean.parseBoolean(property);
+							} else if (cn.equals("float") || cn.equals("Float")) {
+								arg = Float.parseFloat(property);
+							} else if (cn.equals("String")) {
+								arg = property;
+							} else {
+								continue;
+							}
+							method.invoke(object, arg);
+						}
+					}
+				} catch (Throwable ignored) {
+				}
+			}
+		}
+	}
+
+	public String getActivemqUrl() {
+		return activemqUrl;
+	}
+
+	public void setActivemqUrl(String activemqUrl) {
+		this.activemqUrl = activemqUrl;
+	}
+
+	public String getActivemqUsername() {
+		return activemqUsername;
+	}
+
+	public void setActivemqUsername(String activemqUsername) {
+		this.activemqUsername = activemqUsername;
+	}
+
+	public String getActivemqPassword() {
+		return activemqPassword;
+	}
+
+	public void setActivemqPassword(String activemqPassword) {
+		this.activemqPassword = activemqPassword;
+	}
+
+	public String getDestinationType() {
+		return destinationType;
+	}
+
+	public void setDestinationType(String destinationType) {
+		this.destinationType = destinationType;
+	}
+
+	public String getDestinationName() {
+		return destinationName;
+	}
+
+	public void setDestinationName(String destinationName) {
+		this.destinationName = destinationName;
+	}
+}
\ No newline at end of file
diff --git a/src/main/java/io/openmessaging/activemq/Replicator.java b/src/main/java/io/openmessaging/activemq/Replicator.java
new file mode 100644
index 0000000..51ca6c1
--- /dev/null
+++ b/src/main/java/io/openmessaging/activemq/Replicator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.activemq;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.Message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.openmessaging.activemq.pattern.PatternProcessor;
+
+public class Replicator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
+
+    private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
+
+    private PatternProcessor processor;
+    
+    private Config config;
+    private Object lock = new Object();
+    private BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+
+    public Replicator(Config config){
+        this.config = config;
+    }
+
+    public void start() {
+
+        try {
+            processor = new PatternProcessor(this);
+            processor.start();
+
+        } catch (Exception e) {
+            LOGGER.error("Start error.", e);
+        }
+    }
+
+    public void stop(){
+    	processor.stop();
+    }
+
+    public void commit(Message message, boolean isComplete) {
+        queue.add(message);
+    }
+
+    public Config getConfig() {
+    	return this.config;
+    }
+
+    public BlockingQueue<Message> getQueue() {
+        return queue;
+    }
+}
diff --git a/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java b/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java
new file mode 100644
index 0000000..1c9a530
--- /dev/null
+++ b/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.activemq.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.activemq.Config;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivemqConnector extends SourceConnector {
+
+    private KeyValue config;
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+
+        for(String requestKey : Config.REQUEST_CONFIG){
+            if(!config.containsKey(requestKey)){
+                return "Request config key: " + requestKey;
+            }
+        }
+        this.config = config;
+        return "";
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ActivemqTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        List<KeyValue> config = new ArrayList<>();
+        config.add(this.config);
+        return config;
+    }
+}
diff --git a/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java b/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java
new file mode 100644
index 0000000..a04fc50
--- /dev/null
+++ b/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.activemq.connector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.activemq.Config;
+import io.openmessaging.activemq.Replicator;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.source.SourceTask;
+
+public class ActivemqTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(ActivemqTask.class);
+
+    private Replicator replicator;
+
+    private Config config;
+
+    @Override
+    public Collection<SourceDataEntry> poll() {
+
+        List<SourceDataEntry> res = new ArrayList<>();
+
+        try {
+        	Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+            SourceDataEntry sourceDataEntry = null;
+            
+            res.add(sourceDataEntry);
+        } catch (Exception e) {
+            log.error("Mysql task poll error, current config:" + JSON.toJSONString(config), e);
+        }
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue props) {
+
+        try {
+            this.config = new Config();
+            this.config.load(props);
+            this.replicator = new Replicator(config);
+        } catch (Exception e) {
+            log.error("Mysql task start failed.", e);
+        }
+        this.replicator.start();
+    }
+
+    @Override
+    public void stop() {
+        replicator.stop();
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+}
diff --git a/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java b/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java
new file mode 100644
index 0000000..4b4b450
--- /dev/null
+++ b/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java
@@ -0,0 +1,83 @@
+package io.openmessaging.activemq.pattern;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.lang3.StringUtils;
+
+import io.openmessaging.activemq.Config;
+import io.openmessaging.activemq.Replicator;
+
+public class PatternProcessor {
+
+	private Replicator replicator;
+	
+	private Config config;
+	
+	Connection connection;
+	
+	Session session;
+	
+	MessageConsumer consumer;
+	
+	public PatternProcessor(Replicator replicator) {
+		this.replicator = replicator;
+		this.config = replicator.getConfig();
+	}
+	
+	public void start() {
+		try {
+		   ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
+		   
+	        //2、使用连接工厂创建一个连接对象
+		   if(StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword()) ) {
+	           connection = connectionFactory.createConnection(config.getActivemqUsername() , config.getActivemqPassword());
+		   }else {
+			   connection = connectionFactory.createConnection();
+		   }
+	        //3、开启连接
+	        connection.start();
+	        //4、使用连接对象创建会话(session)对象
+	        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
+	        Destination destination = null;
+	        if(StringUtils.equals("topic", config.getDestinationType())) {
+	        	destination = session.createTopic(config.getDestinationName());
+	        }else if(StringUtils.equals("queue", config.getDestinationType())){
+	        	destination = session.createQueue(config.getDestinationName());
+	        }else {
+	        	throw new RuntimeException("");
+	        }
+	        consumer = session.createConsumer(destination);
+	        //6、使用会话对象创建生产者对象
+	        //7、向consumer对象中设置一个messageListener对象,用来接收消息
+	        consumer.setMessageListener(new MessageListener() {
+	            @Override
+	            public void onMessage(Message message) {
+	            	replicator.commit(message, true);
+	            }
+	        });
+		}catch(Exception e) {
+			
+		}
+	}
+	
+	public void stop() {
+        try {
+        	consumer.close();
+        	session.close();
+			connection.close();
+		} catch (JMSException e) {
+			e.printStackTrace();
+		}
+	}
+	
+ 
+}

[rocketmq-connect] 10/12: format pom.xml

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 3da8a3fc303cb2dec54dcb1922d84818502ae3eb
Author: laohu <23...@qq.com>
AuthorDate: Thu Jun 13 18:32:31 2019 +0800

    format pom.xml
---
 pom.xml                                            | 354 ++++++++++-----------
 .../activemq/connector/ActivemqSourceTask.java     |   6 +-
 2 files changed, 180 insertions(+), 180 deletions(-)

diff --git a/pom.xml b/pom.xml
index 473a663..ccb4118 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,191 +11,191 @@
 	language governing permissions and limitations under the License. -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
 
-	<groupId>org.apache.rocketmq</groupId>
-	<artifactId>rocketmq-connect-activemq</artifactId>
-	<version>1.0.0</version>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-activemq</artifactId>
+    <version>1.0.0</version>
 
-	<name>connect-activemq</name>
-	<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-activemq</url>
+    <name>connect-activemq</name>
+    <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-activemq</url>
 
-	<licenses>
-		<license>
-			<name>The Apache Software License, Version 2.0</name>
-			<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-		</license>
-	</licenses>
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
 
-	<issueManagement>
-		<system>jira</system>
-		<url>https://issues.apache.org/jira/browse/RocketMQ</url>
-	</issueManagement>
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
 
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 
-		<!-- Compiler settings properties -->
-		<maven.compiler.source>1.8</maven.compiler.source>
-		<maven.compiler.target>1.8</maven.compiler.target>
-	</properties>
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
 
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>versions-maven-plugin</artifactId>
-				<version>2.3</version>
-			</plugin>
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>clirr-maven-plugin</artifactId>
-				<version>2.7</version>
-			</plugin>
-			<plugin>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.6.1</version>
-				<configuration>
-					<source>${maven.compiler.source}</source>
-					<target>${maven.compiler.target}</target>
-					<compilerVersion>${maven.compiler.source}</compilerVersion>
-					<showDeprecation>true</showDeprecation>
-					<showWarnings>true</showWarnings>
-				</configuration>
-			</plugin>
-			<plugin>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<version>2.19.1</version>
-				<configuration>
-					<argLine>-Xms512m -Xmx1024m</argLine>
-					<forkMode>always</forkMode>
-					<includes>
-						<include>**/*Test.java</include>
-					</includes>
-				</configuration>
-			</plugin>
-			<plugin>
-				<artifactId>maven-site-plugin</artifactId>
-				<version>3.6</version>
-				<configuration>
-					<locales>en_US</locales>
-					<outputEncoding>UTF-8</outputEncoding>
-					<inputEncoding>UTF-8</inputEncoding>
-				</configuration>
-			</plugin>
-			<plugin>
-				<artifactId>maven-source-plugin</artifactId>
-				<version>3.0.1</version>
-				<executions>
-					<execution>
-						<id>attach-sources</id>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<artifactId>maven-javadoc-plugin</artifactId>
-				<version>2.10.4</version>
-				<configuration>
-					<charset>UTF-8</charset>
-					<locale>en_US</locale>
-					<excludePackageNames>io.openmessaging.internal</excludePackageNames>
-				</configuration>
-				<executions>
-					<execution>
-						<id>aggregate</id>
-						<goals>
-							<goal>aggregate</goal>
-						</goals>
-						<phase>site</phase>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<artifactId>maven-resources-plugin</artifactId>
-				<version>3.0.2</version>
-				<configuration>
-					<encoding>${project.build.sourceEncoding}</encoding>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>findbugs-maven-plugin</artifactId>
-				<version>3.0.4</version>
-			</plugin>
-		</plugins>
-	</build>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+        </plugins>
+    </build>
 
-	<dependencies>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>4.11</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.assertj</groupId>
-			<artifactId>assertj-core</artifactId>
-			<version>2.6.0</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.mockito</groupId>
-			<artifactId>mockito-core</artifactId>
-			<version>2.6.3</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>io.openmessaging</groupId>
-			<artifactId>openmessaging-connector</artifactId>
-			<version>0.1.0-beta</version>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba</groupId>
-			<artifactId>fastjson</artifactId>
-			<version>1.2.51</version>
-		</dependency>
-		<dependency>
-			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-api</artifactId>
-			<version>1.7.7</version>
-		</dependency>
-		<dependency>
-			<groupId>ch.qos.logback</groupId>
-			<artifactId>logback-classic</artifactId>
-			<version>1.0.13</version>
-		</dependency>
-		<dependency>
-			<groupId>ch.qos.logback</groupId>
-			<artifactId>logback-core</artifactId>
-			<version>1.0.13</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.rocketmq</groupId>
-			<artifactId>rocketmq-openmessaging</artifactId>
-			<version>4.3.2</version>
-		</dependency>
-		<dependency>
-			<groupId>commons-cli</groupId>
-			<artifactId>commons-cli</artifactId>
-			<version>1.2</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.activemq</groupId>
-			<artifactId>activemq-all</artifactId>
-			<version>5.9.0</version>
-		</dependency>
-		<dependency>
-			<groupId>javax.jms</groupId>
-			<artifactId>javax.jms-api</artifactId>
-			<version>2.0</version>
-		</dependency>
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.0-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.51</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-all</artifactId>
+            <version>5.9.0</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.jms</groupId>
+            <artifactId>javax.jms-api</artifactId>
+            <version>2.0</version>
+        </dependency>
 
-	</dependencies>
+    </dependencies>
 
 </project>
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
index 9d1a1aa..c009274 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
@@ -61,8 +61,8 @@ public class ActivemqSourceTask extends SourceTask {
         try {
             Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
             if (message != null) {
-                Object[] payload = new Object[] {config.getDestinationType(), config.getDestinationName(), getMessageConnent(message)};
-                SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), EntryType.CREATE, null, null, payload);
+                Object[] payload = new Object[] {config.getDestinationType(), config.getDestinationName(), getMessageContent(message)};
+                SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, null, System.currentTimeMillis(), EntryType.CREATE, null, null, payload);
                 res.add(sourceDataEntry);
             }
         } catch (Exception e) {
@@ -104,7 +104,7 @@ public class ActivemqSourceTask extends SourceTask {
     }
 
     @SuppressWarnings("unchecked")
-    public ByteBuffer getMessageConnent(Message message) throws JMSException {
+    public ByteBuffer getMessageContent(Message message) throws JMSException {
         byte[] data = null;
         if (message instanceof TextMessage) {
             data = ((TextMessage) message).getText().getBytes();

[rocketmq-connect] 01/12: (1)Rename rocketmq-connector to replicator (2) Initialize RocketMQ activemq connect and runtime

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit f4221ce058986f465671469e46a7570452ef6a24
Author: duheng.dh <du...@alibaba-inc.com>
AuthorDate: Fri May 31 19:08:13 2019 +0800

    (1)Rename rocketmq-connector to replicator (2) Initialize RocketMQ activemq connect and runtime
---
 README.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/README.md
@@ -0,0 +1 @@
+

[rocketmq-connect] 03/12: change package name

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 30b02f4de300c556f10863e85b64a1eea68165cd
Author: laohu <23...@qq.com>
AuthorDate: Wed Jun 5 08:13:43 2019 +0800

    change package name
---
 pom.xml                                            | 48 ----------------------
 .../apache/rocketmq/connect}/activemq/Config.java  |  2 +-
 .../rocketmq/connect}/activemq/Replicator.java     |  5 +--
 .../activemq/connector/ActivemqConnector.java      |  5 ++-
 .../connect}/activemq/connector/ActivemqTask.java  |  8 ++--
 .../activemq/pattern/PatternProcessor.java         | 11 +++--
 6 files changed, 15 insertions(+), 64 deletions(-)

diff --git a/pom.xml b/pom.xml
index b021330..f22c291 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,10 +20,6 @@
 	<version>1.0.0</version>
 
 	<name>connect-activemq</name>
-	<description>Redis Replicator is a redis RDB and Command parser written in java.
-        It can parse,filter,broadcast the RDB and Command events in a real time manner
-        and resent to Apache RocketMQ, then consumer could subscribe topic to receive data.
-    </description>
 	<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-activemq</url>
 
 	<licenses>
@@ -33,36 +29,6 @@
 		</license>
 	</licenses>
 
-	<developers>
-		<developer>
-			<name>Leon Chen</name>
-			<email>chen.bao.yi@gmail.com</email>
-			<organization>moilioncircle</organization>
-			<organizationUrl>http://www.moilioncircle.com/</organizationUrl>
-			<roles>
-				<role>Developer</role>
-			</roles>
-			<timezone>+8</timezone>
-		</developer>
-
-		<developer>
-			<name>Adrian Yao</name>
-			<email>adrianyaofly@gmail.com</email>
-			<organization>unstudy</organization>
-			<timezone>+8</timezone>
-		</developer>
-
-		<developer>
-			<name>Rick Zhang</name>
-			<email>zhangke.huangshan@gmail.com</email>
-			<organization>treefinance.com.cn</organization>
-			<roles>
-				<role>Developer</role>
-			</roles>
-			<timezone>+8</timezone>
-		</developer>
-	</developers>
-
 	<issueManagement>
 		<system>jira</system>
 		<url>https://issues.apache.org/jira/browse/RocketMQ</url>
@@ -185,12 +151,6 @@
 			<scope>test</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.httpcomponents</groupId>
-			<artifactId>httpclient</artifactId>
-			<version>4.5.5</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
 			<groupId>io.openmessaging</groupId>
 			<artifactId>openmessaging-connect-runtime</artifactId>
 			<version>0.0.1-SNAPSHOT</version>
@@ -210,8 +170,6 @@
 			<artifactId>fastjson</artifactId>
 			<version>1.2.51</version>
 		</dependency>
-		<!-- <dependency> <groupId>io.javalin</groupId> <artifactId>javalin</artifactId> 
-			<version>1.3.0</version> </dependency> -->
 		<dependency>
 			<groupId>org.slf4j</groupId>
 			<artifactId>slf4j-api</artifactId>
@@ -248,12 +206,6 @@
 			<version>2.0</version>
 		</dependency>
 
-		<dependency>
-			<groupId>org.glassfish.main.javaee-api</groupId>
-			<artifactId>javax.jms</artifactId>
-			<version>3.1.2.2</version>
-		</dependency>
-
 	</dependencies>
 
 </project>
diff --git a/src/main/java/io/openmessaging/activemq/Config.java b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
similarity index 98%
rename from src/main/java/io/openmessaging/activemq/Config.java
rename to src/main/java/org/apache/rocketmq/connect/activemq/Config.java
index 10a5d9f..15a95a7 100644
--- a/src/main/java/io/openmessaging/activemq/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package io.openmessaging.activemq;
+package org.apache.rocketmq.connect.activemq;
 
 import java.lang.reflect.Method;
 import java.util.HashSet;
diff --git a/src/main/java/io/openmessaging/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
similarity index 94%
rename from src/main/java/io/openmessaging/activemq/Replicator.java
rename to src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index 51ca6c1..499beb0 100644
--- a/src/main/java/io/openmessaging/activemq/Replicator.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -15,18 +15,17 @@
  * limitations under the License.
  */
 
-package io.openmessaging.activemq;
+package org.apache.rocketmq.connect.activemq;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.jms.Message;
 
+import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.openmessaging.activemq.pattern.PatternProcessor;
-
 public class Replicator {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
diff --git a/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java
similarity index 94%
rename from src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java
rename to src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java
index 1c9a530..17d3efe 100644
--- a/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package io.openmessaging.activemq.connector;
+package org.apache.rocketmq.connect.activemq.connector;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.activemq.Config;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.rocketmq.connect.activemq.Config;
+
 public class ActivemqConnector extends SourceConnector {
 
     private KeyValue config;
diff --git a/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
similarity index 92%
rename from src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java
rename to src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
index a04fc50..f871be5 100644
--- a/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package io.openmessaging.activemq.connector;
+package org.apache.rocketmq.connect.activemq.connector;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -24,14 +24,14 @@ import java.util.concurrent.TimeUnit;
 
 import javax.jms.Message;
 
+import org.apache.rocketmq.connect.activemq.Config;
+import org.apache.rocketmq.connect.activemq.Replicator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.activemq.Config;
-import io.openmessaging.activemq.Replicator;
 import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.connector.api.source.SourceTask;
 
@@ -66,10 +66,10 @@ public class ActivemqTask extends SourceTask {
             this.config = new Config();
             this.config.load(props);
             this.replicator = new Replicator(config);
+            this.replicator.start();
         } catch (Exception e) {
             log.error("Mysql task start failed.", e);
         }
-        this.replicator.start();
     }
 
     @Override
diff --git a/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
similarity index 91%
rename from src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java
rename to src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index 4b4b450..c1b282c 100644
--- a/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -1,4 +1,4 @@
-package io.openmessaging.activemq.pattern;
+package org.apache.rocketmq.connect.activemq.pattern;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -11,9 +11,8 @@ import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.commons.lang3.StringUtils;
-
-import io.openmessaging.activemq.Config;
-import io.openmessaging.activemq.Replicator;
+import org.apache.rocketmq.connect.activemq.Config;
+import org.apache.rocketmq.connect.activemq.Replicator;
 
 public class PatternProcessor {
 
@@ -65,7 +64,7 @@ public class PatternProcessor {
 	            }
 	        });
 		}catch(Exception e) {
-			
+			throw new RuntimeException(e);
 		}
 	}
 	
@@ -75,7 +74,7 @@ public class PatternProcessor {
         	session.close();
 			connection.close();
 		} catch (JMSException e) {
-			e.printStackTrace();
+			throw new RuntimeException(e);
 		}
 	}
 	

[rocketmq-connect] 09/12: change exception

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit db391c5c580d1d251aff442d994c9932af1a35c6
Author: laohu <23...@qq.com>
AuthorDate: Thu Jun 13 10:12:29 2019 +0800

    change exception
---
 pom.xml                                            |  5 --
 .../rocketmq/connect/activemq/ErrorCode.java       |  8 +++
 .../rocketmq/connect/activemq/Replicator.java      | 18 +++---
 .../activemq/connector/ActivemqSourceTask.java     | 21 +++++--
 .../connect/activemq/pattern/PatternProcessor.java | 66 ++++++++++------------
 .../rocketmq/connect/activemq/ReplicatorTest.java  |  4 +-
 6 files changed, 64 insertions(+), 58 deletions(-)

diff --git a/pom.xml b/pom.xml
index 6e933bd..473a663 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,11 +156,6 @@
 			<version>0.1.0-beta</version>
 		</dependency>
 		<dependency>
-			<groupId>io.openmessaging</groupId>
-			<artifactId>openmessaging-api</artifactId>
-			<version>0.3.1-alpha</version>
-		</dependency>
-		<dependency>
 			<groupId>com.alibaba</groupId>
 			<artifactId>fastjson</artifactId>
 			<version>1.2.51</version>
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java b/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java
new file mode 100644
index 0000000..de3b3f5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java
@@ -0,0 +1,8 @@
+package org.apache.rocketmq.connect.activemq;
+
+public class ErrorCode {
+
+    public static final int START_ERROR_CODE = 10001;
+
+    public static final int STOP_ERROR_CODE = 10002;
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index 2b18481..e0ebe12 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.connect.activemq;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+
 import javax.jms.Message;
+
 import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,19 +39,13 @@ public class Replicator {
         this.config = config;
     }
 
-    public void start() {
-
-        try {
-            processor = new PatternProcessor(this);
-            processor.start();
-
-        } catch (Exception e) {
-            LOGGER.error("Start error.", e);
-            throw new RuntimeException(e);
-        }
+    public void start() throws Exception {
+        processor = new PatternProcessor(this);
+        processor.start();
+        LOGGER.info("Replicator start succeed");
     }
 
-    public void stop() {
+    public void stop() throws Exception {
         processor.stop();
     }
 
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
index 2140b5f..9d1a1aa 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.connect.activemq.connector;
 
 import com.alibaba.fastjson.JSON;
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.EntryType;
 import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.exception.DataConnectException;
 import io.openmessaging.connector.api.source.SourceTask;
 import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
@@ -38,6 +40,7 @@ import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 import org.apache.rocketmq.connect.activemq.Config;
+import org.apache.rocketmq.connect.activemq.ErrorCode;
 import org.apache.rocketmq.connect.activemq.Replicator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +61,8 @@ public class ActivemqSourceTask extends SourceTask {
         try {
             Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
             if (message != null) {
-                SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
+                Object[] payload = new Object[] {config.getDestinationType(), config.getDestinationName(), getMessageConnent(message)};
+                SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), EntryType.CREATE, null, null, payload);
                 res.add(sourceDataEntry);
             }
         } catch (Exception e) {
@@ -74,15 +78,21 @@ public class ActivemqSourceTask extends SourceTask {
             this.config.load(props);
             this.sourcePartition = ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8"));
             this.replicator = new Replicator(config);
+            this.replicator.start();
         } catch (Exception e) {
-            log.error("Mysql task start failed.", e);
+            log.error("activemq task start failed.", e);
+            throw new DataConnectException(ErrorCode.START_ERROR_CODE, e.getMessage(), e);
         }
-        this.replicator.start();
     }
 
     @Override
     public void stop() {
-        replicator.stop();
+        try {
+            replicator.stop();
+        } catch (Exception e) {
+            log.error("activemq task stop failed.", e);
+            throw new DataConnectException(ErrorCode.STOP_ERROR_CODE, e.getMessage(), e);
+        }
     }
 
     @Override public void pause() {
@@ -123,8 +133,9 @@ public class ActivemqSourceTask extends SourceTask {
             }
             data = bis.toByteArray();
         } else {
+            // The exception is printed and does not need to be written as a DataConnectException
             throw new RuntimeException("message type exception");
         }
-        return data != null ? ByteBuffer.wrap(data) : null;
+        return ByteBuffer.wrap(data);
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index b0836f9..6e39a7e 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -20,11 +20,11 @@ package org.apache.rocketmq.connect.activemq.pattern;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.activemq.Config;
@@ -47,47 +47,43 @@ public class PatternProcessor {
         this.config = replicator.getConfig();
     }
 
-    public void start() {
-        if (!StringUtils.equals("topic", config.getDestinationType()) && !StringUtils.equals("queue", config.getDestinationType())) {
+    public void start() throws Exception {
+        if (!StringUtils.equals("topic", config.getDestinationType())
+            && !StringUtils.equals("queue", config.getDestinationType())) {
+            // RuntimeException is caught by DataConnectException
             throw new RuntimeException("destination type is incorrectness");
         }
 
-        try {
-            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
 
-            if (StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword())) {
-                connection = connectionFactory.createConnection(config.getActivemqUsername(), config.getActivemqPassword());
-            } else {
-                connection = connectionFactory.createConnection();
-            }
-            connection.start();
-            Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
-            Destination destination = null;
-            if (StringUtils.equals("topic", config.getDestinationType())) {
-                destination = session.createTopic(config.getDestinationName());
-            } else if (StringUtils.equals("queue", config.getDestinationType())) {
-                destination = session.createQueue(config.getDestinationName());
-            }
-            consumer = session.createConsumer(destination, config.getMessageSelector());
-            consumer.setMessageListener(new MessageListener() {
-                @Override
-                public void onMessage(Message message) {
-                    replicator.commit(message, true);
-                }
-            });
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+        if (StringUtils.isNotBlank(config.getActivemqUsername())
+            && StringUtils.isNotBlank(config.getActivemqPassword())) {
+            connection = connectionFactory.createConnection(config.getActivemqUsername(), config.getActivemqPassword());
+        } else {
+            connection = connectionFactory.createConnection();
+        }
+        connection.start();
+        Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
+        Destination destination = null;
+        if (StringUtils.equals("topic", config.getDestinationType())) {
+            destination = session.createTopic(config.getDestinationName());
+        } else if (StringUtils.equals("queue", config.getDestinationType())) {
+            destination = session.createQueue(config.getDestinationName());
         }
+        consumer = session.createConsumer(destination, config.getMessageSelector());
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                replicator.commit(message, true);
+            }
+        });
+
     }
 
-    public void stop() {
-        try {
-            consumer.close();
-            session.close();
-            connection.close();
-        } catch (JMSException e) {
-            throw new RuntimeException(e);
-        }
+    public void stop() throws Exception {
+        consumer.close();
+        session.close();
+        connection.close();
     }
 
 }
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
index 28cafa4..b94237e 100644
--- a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
@@ -50,12 +50,12 @@ public class ReplicatorTest {
     }
 
     @Test(expected = RuntimeException.class)
-    public void startTest() {
+    public void startTest() throws Exception {
         replicator.start();
     }
 
     @Test
-    public void stop() {
+    public void stop() throws Exception {
         replicator.stop();
         Mockito.verify(patternProcessor, Mockito.times(1)).stop();
     }

[rocketmq-connect] 07/12: add unit test

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 508eea1dc30c0b64abcadd9eb6083cb06ee57190
Author: laohu <23...@qq.com>
AuthorDate: Wed Jun 12 00:53:24 2019 +0800

    add unit test
---
 .../apache/rocketmq/connect/activemq/Config.java   | 276 ++++++++++-----------
 .../rocketmq/connect/activemq/Replicator.java      |  13 +-
 ...Connector.java => ActivemqSourceConnector.java} |   9 +-
 .../{ActivemqTask.java => ActivemqSourceTask.java} | 100 ++++----
 .../connect/activemq/pattern/PatternProcessor.java | 116 +++++----
 .../rocketmq/connect/activemq/ReplicatorTest.java  |  57 +++++
 .../activemq/connector/ActivemqConnectorTest.java  |  41 +++
 .../activemq/connector/ActivemqSourceTaskTest.java | 148 +++++++++++
 .../activemq/connector/ActivemqTaskTest.java       |  68 -----
 9 files changed, 496 insertions(+), 332 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
index af218c0..30f898d 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
@@ -17,151 +17,147 @@
 
 package org.apache.rocketmq.connect.activemq;
 
+import io.openmessaging.KeyValue;
 import java.lang.reflect.Method;
 import java.util.HashSet;
 import java.util.Set;
-
 import javax.jms.Session;
 
-import io.openmessaging.KeyValue;
-
 public class Config {
 
-	@SuppressWarnings("serial")
-	public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
-		{
-			add("activemqUrl");
-			add("destinationType");
-			add("destinationName");
-		}
-	};
-
-	public String activemqUrl;
-
-	public String activemqUsername;
-
-	public String activemqPassword;
-
-	public String destinationType;
-
-	public String destinationName;
-	
-	public String messageSelector;
-	
-	private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
-	private Boolean sessionTransacted = Boolean.FALSE;
-	
-	public void load(KeyValue props) {
-
-		properties2Object(props, this);
-	}
-
-	private void properties2Object(final KeyValue p, final Object object) {
-
-		Method[] methods = object.getClass().getMethods();
-		for (Method method : methods) {
-			String mn = method.getName();
-			if (mn.startsWith("set")) {
-				try {
-					String tmp = mn.substring(4);
-					String first = mn.substring(3, 4);
-
-					String key = first.toLowerCase() + tmp;
-					String property = p.getString(key);
-					if (property != null) {
-						Class<?>[] pt = method.getParameterTypes();
-						if (pt != null && pt.length > 0) {
-							String cn = pt[0].getSimpleName();
-							Object arg;
-							if (cn.equals("int") || cn.equals("Integer")) {
-								arg = Integer.parseInt(property);
-							} else if (cn.equals("long") || cn.equals("Long")) {
-								arg = Long.parseLong(property);
-							} else if (cn.equals("double") || cn.equals("Double")) {
-								arg = Double.parseDouble(property);
-							} else if (cn.equals("boolean") || cn.equals("Boolean")) {
-								arg = Boolean.parseBoolean(property);
-							} else if (cn.equals("float") || cn.equals("Float")) {
-								arg = Float.parseFloat(property);
-							} else if (cn.equals("String")) {
-								arg = property;
-							} else {
-								continue;
-							}
-							method.invoke(object, arg);
-						}
-					}
-				} catch (Throwable ignored) {
-				}
-			}
-		}
-	}
-
-	public String getActivemqUrl() {
-		return activemqUrl;
-	}
-
-	public void setActivemqUrl(String activemqUrl) {
-		this.activemqUrl = activemqUrl;
-	}
-
-	public String getActivemqUsername() {
-		return activemqUsername;
-	}
-
-	public void setActivemqUsername(String activemqUsername) {
-		this.activemqUsername = activemqUsername;
-	}
-
-	public String getActivemqPassword() {
-		return activemqPassword;
-	}
-
-	public void setActivemqPassword(String activemqPassword) {
-		this.activemqPassword = activemqPassword;
-	}
-
-	public String getDestinationType() {
-		return destinationType;
-	}
-
-	public void setDestinationType(String destinationType) {
-		this.destinationType = destinationType;
-	}
-
-	public String getDestinationName() {
-		return destinationName;
-	}
-
-	public void setDestinationName(String destinationName) {
-		this.destinationName = destinationName;
-	}
-
-	public String getMessageSelector() {
-		return messageSelector;
-	}
-
-	public void setMessageSelector(String messageSelector) {
-		this.messageSelector = messageSelector;
-	}
-
-	public Integer getSessionAcknowledgeMode() {
-		return sessionAcknowledgeMode;
-	}
-
-	public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
-		this.sessionAcknowledgeMode = sessionAcknowledgeMode;
-	}
-
-	public Boolean getSessionTransacted() {
-		return sessionTransacted;
-	}
-
-	public void setSessionTransacted(Boolean sessionTransacted) {
-		this.sessionTransacted = sessionTransacted;
-	}
-	
-	
-	
+    @SuppressWarnings("serial")
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("activemqUrl");
+            add("destinationType");
+            add("destinationName");
+        }
+    };
+
+    public String activemqUrl;
+
+    public String activemqUsername;
+
+    public String activemqPassword;
+
+    public String destinationType;
+
+    public String destinationName;
+
+    public String messageSelector;
+
+    private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
+    private Boolean sessionTransacted = Boolean.FALSE;
+
+    public void load(KeyValue props) {
+
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public String getActivemqUrl() {
+        return activemqUrl;
+    }
+
+    public void setActivemqUrl(String activemqUrl) {
+        this.activemqUrl = activemqUrl;
+    }
+
+    public String getActivemqUsername() {
+        return activemqUsername;
+    }
+
+    public void setActivemqUsername(String activemqUsername) {
+        this.activemqUsername = activemqUsername;
+    }
+
+    public String getActivemqPassword() {
+        return activemqPassword;
+    }
+
+    public void setActivemqPassword(String activemqPassword) {
+        this.activemqPassword = activemqPassword;
+    }
+
+    public String getDestinationType() {
+        return destinationType;
+    }
+
+    public void setDestinationType(String destinationType) {
+        this.destinationType = destinationType;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public void setDestinationName(String destinationName) {
+        this.destinationName = destinationName;
+    }
+
+    public String getMessageSelector() {
+        return messageSelector;
+    }
+
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    public Integer getSessionAcknowledgeMode() {
+        return sessionAcknowledgeMode;
+    }
+
+    public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
+        this.sessionAcknowledgeMode = sessionAcknowledgeMode;
+    }
+
+    public Boolean getSessionTransacted() {
+        return sessionTransacted;
+    }
+
+    public void setSessionTransacted(Boolean sessionTransacted) {
+        this.sessionTransacted = sessionTransacted;
+    }
+
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index dd83c37..2b18481 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -19,9 +19,7 @@ package org.apache.rocketmq.connect.activemq;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-
 import javax.jms.Message;
-
 import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,11 +29,11 @@ public class Replicator {
     private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
 
     private PatternProcessor processor;
-    
+
     private Config config;
     private BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
 
-    public Replicator(Config config){
+    public Replicator(Config config) {
         this.config = config;
     }
 
@@ -47,11 +45,12 @@ public class Replicator {
 
         } catch (Exception e) {
             LOGGER.error("Start error.", e);
+            throw new RuntimeException(e);
         }
     }
 
-    public void stop(){
-    	processor.stop();
+    public void stop() {
+        processor.stop();
     }
 
     public void commit(Message message, boolean isComplete) {
@@ -59,7 +58,7 @@ public class Replicator {
     }
 
     public Config getConfig() {
-    	return this.config;
+        return this.config;
     }
 
     public BlockingQueue<Message> getQueue() {
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
similarity index 89%
rename from src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java
rename to src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
index 17d3efe..7e6290b 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
@@ -22,18 +22,17 @@ import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.rocketmq.connect.activemq.Config;
 
-public class ActivemqConnector extends SourceConnector {
+public class ActivemqSourceConnector extends SourceConnector {
 
     private KeyValue config;
 
     @Override
     public String verifyAndSetConfig(KeyValue config) {
 
-        for(String requestKey : Config.REQUEST_CONFIG){
-            if(!config.containsKey(requestKey)){
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
                 return "Request config key: " + requestKey;
             }
         }
@@ -61,7 +60,7 @@ public class ActivemqConnector extends SourceConnector {
 
     @Override
     public Class<? extends Task> taskClass() {
-        return ActivemqTask.class;
+        return ActivemqSourceTask.class;
     }
 
     @Override
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
similarity index 55%
rename from src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
rename to src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
index 7dead7b..2140b5f 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
@@ -17,7 +17,10 @@
 
 package org.apache.rocketmq.connect.activemq.connector;
 
-import java.io.ByteArrayInputStream;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.source.SourceTask;
 import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -27,7 +30,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
@@ -35,40 +37,32 @@ import javax.jms.Message;
 import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
-
 import org.apache.rocketmq.connect.activemq.Config;
 import org.apache.rocketmq.connect.activemq.Replicator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.source.SourceTask;
-
-public class ActivemqTask extends SourceTask {
+public class ActivemqSourceTask extends SourceTask {
 
-    private static final Logger log = LoggerFactory.getLogger(ActivemqTask.class);
+    private static final Logger log = LoggerFactory.getLogger(ActivemqSourceTask.class);
 
     private Replicator replicator;
 
     private Config config;
-    
+
     private ByteBuffer sourcePartition;
 
-    
-	@Override
+    @Override
     public Collection<SourceDataEntry> poll() {
         List<SourceDataEntry> res = new ArrayList<>();
         try {
-        	Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
-        	if(message != null) {        		
-        		SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
-        		res.add(sourceDataEntry);
-        	}
+            Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+            if (message != null) {
+                SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
+                res.add(sourceDataEntry);
+            }
         } catch (Exception e) {
-            log.error("Mysql task poll error, current config:" + JSON.toJSONString(config), e);
+            log.error("activemq task poll error, current config:" + JSON.toJSONString(config), e);
         }
         return res;
     }
@@ -98,39 +92,39 @@ public class ActivemqTask extends SourceTask {
     @Override public void resume() {
 
     }
-    
+
     @SuppressWarnings("unchecked")
-    public ByteBuffer getMessageConnent(Message message ) throws JMSException {
-    	byte[] data = null;
-		if(message instanceof TextMessage) {
-			data = ((TextMessage) message).getText().getBytes();
-		}else if(message instanceof ObjectMessage) {
-			data = JSON.toJSONBytes( ((ObjectMessage) message).getObject());
-		}else if(message instanceof BytesMessage) {
-			BytesMessage bytesMessage = (BytesMessage)message;
-			data = new byte[(int) bytesMessage.getBodyLength()];
-			bytesMessage.readBytes(data);
-		}else if(message instanceof MapMessage) {
-			MapMessage mapMessage = (MapMessage)message;
-			Map<String,Object> map = new HashMap<>();
-			Enumeration<Object> names = mapMessage.getMapNames();
-			while(names.hasMoreElements()) {
-				String name = names.nextElement().toString();
-				map.put(name, mapMessage.getObject(name));
-			}
-			data = JSON.toJSONBytes(map);
-		}else if(message instanceof StreamMessage) {
-			StreamMessage streamMessage = (StreamMessage)message;
-			ByteArrayOutputStream bis = new ByteArrayOutputStream();
-			byte[] by = new byte[1024];
-			int i = 0;
-			while( (i = streamMessage.readBytes(by)) != 0) {
-				bis.write(by, 0, i);
-			}
-			data = bis.toByteArray();
-		}else {
-			throw new RuntimeException("message type exception");
-		}
-		return data!=null ? ByteBuffer.wrap( data ) : null;
+    public ByteBuffer getMessageConnent(Message message) throws JMSException {
+        byte[] data = null;
+        if (message instanceof TextMessage) {
+            data = ((TextMessage) message).getText().getBytes();
+        } else if (message instanceof ObjectMessage) {
+            data = JSON.toJSONBytes(((ObjectMessage) message).getObject());
+        } else if (message instanceof BytesMessage) {
+            BytesMessage bytesMessage = (BytesMessage) message;
+            data = new byte[(int) bytesMessage.getBodyLength()];
+            bytesMessage.readBytes(data);
+        } else if (message instanceof MapMessage) {
+            MapMessage mapMessage = (MapMessage) message;
+            Map<String, Object> map = new HashMap<>();
+            Enumeration<Object> names = mapMessage.getMapNames();
+            while (names.hasMoreElements()) {
+                String name = names.nextElement().toString();
+                map.put(name, mapMessage.getObject(name));
+            }
+            data = JSON.toJSONBytes(map);
+        } else if (message instanceof StreamMessage) {
+            StreamMessage streamMessage = (StreamMessage) message;
+            ByteArrayOutputStream bis = new ByteArrayOutputStream();
+            byte[] by = new byte[1024];
+            int i = 0;
+            while ((i = streamMessage.readBytes(by)) != -1) {
+                bis.write(by, 0, i);
+            }
+            data = bis.toByteArray();
+        } else {
+            throw new RuntimeException("message type exception");
+        }
+        return data != null ? ByteBuffer.wrap(data) : null;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index b26bfb9..60a34cf 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -8,7 +8,6 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.activemq.Config;
@@ -16,63 +15,62 @@ import org.apache.rocketmq.connect.activemq.Replicator;
 
 public class PatternProcessor {
 
-	private Replicator replicator;
-	
-	private Config config;
-	
-	Connection connection;
-	
-	Session session;
-	
-	MessageConsumer consumer;
-	
-	public PatternProcessor(Replicator replicator) {
-		this.replicator = replicator;
-		this.config = replicator.getConfig();
-	}
-	
-	public void start() {
-		if(!StringUtils.equals("topic", config.getDestinationType())&&!StringUtils.equals("queue", config.getDestinationType())) {
-			throw new RuntimeException("destination type is incorrectness");
-		}
-		
-		try {
-		   ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
-		   
-		   if(StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword()) ) {
-	           connection = connectionFactory.createConnection(config.getActivemqUsername() , config.getActivemqPassword());
-		   }else {
-			   connection = connectionFactory.createConnection();
-		   }
-	        connection.start();
-	        Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
-	        Destination destination = null;
-	        if(StringUtils.equals("topic", config.getDestinationType())) {
-	        	destination = session.createTopic(config.getDestinationName());
-	        }else if(StringUtils.equals("queue", config.getDestinationType())){
-	        	destination = session.createQueue(config.getDestinationName());
-	        }
-	        consumer = session.createConsumer(destination, config.getMessageSelector());
-	        consumer.setMessageListener(new MessageListener() {
-	            @Override
-	            public void onMessage(Message message) {
-	            	replicator.commit(message, true);
-	            }
-	        });
-		}catch(Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-	
-	public void stop() {
+    private Replicator replicator;
+
+    private Config config;
+
+    Connection connection;
+
+    Session session;
+
+    MessageConsumer consumer;
+
+    public PatternProcessor(Replicator replicator) {
+        this.replicator = replicator;
+        this.config = replicator.getConfig();
+    }
+
+    public void start() {
+        if (!StringUtils.equals("topic", config.getDestinationType()) && !StringUtils.equals("queue", config.getDestinationType())) {
+            throw new RuntimeException("destination type is incorrectness");
+        }
+
+        try {
+            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
+
+            if (StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword())) {
+                connection = connectionFactory.createConnection(config.getActivemqUsername(), config.getActivemqPassword());
+            } else {
+                connection = connectionFactory.createConnection();
+            }
+            connection.start();
+            Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
+            Destination destination = null;
+            if (StringUtils.equals("topic", config.getDestinationType())) {
+                destination = session.createTopic(config.getDestinationName());
+            } else if (StringUtils.equals("queue", config.getDestinationType())) {
+                destination = session.createQueue(config.getDestinationName());
+            }
+            consumer = session.createConsumer(destination, config.getMessageSelector());
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    replicator.commit(message, true);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void stop() {
         try {
-        	consumer.close();
-        	session.close();
-			connection.close();
-		} catch (JMSException e) {
-			throw new RuntimeException(e);
-		}
-	}
-	
- 
+            consumer.close();
+            session.close();
+            connection.close();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
 }
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
new file mode 100644
index 0000000..909a5d7
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
@@ -0,0 +1,57 @@
+package org.apache.rocketmq.connect.activemq;
+
+import java.lang.reflect.Field;
+
+import javax.jms.Message;
+
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.junit.Assert;
+
+public class ReplicatorTest {
+
+    Replicator replicator;
+
+    PatternProcessor patternProcessor;
+
+    Config config;
+
+    @Before
+    public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
+        config = new Config();
+        replicator = new Replicator(config);
+
+        patternProcessor = Mockito.mock(PatternProcessor.class);
+
+        Field processor = Replicator.class.getDeclaredField("processor");
+        processor.setAccessible(true);
+        processor.set(replicator, patternProcessor);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void startTest() {
+        replicator.start();
+    }
+
+    @Test
+    public void stop() {
+        replicator.stop();
+        Mockito.verify(patternProcessor, Mockito.times(1)).stop();
+    }
+
+    @Test
+    public void commitAddGetQueueTest() {
+        Message message = new ActiveMQTextMessage();
+        replicator.commit(message, false);
+        Assert.assertEquals(replicator.getQueue().poll(), message);
+    }
+
+    @Test
+    public void getConfigTest() {
+        Assert.assertEquals(replicator.getConfig(), config);
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
new file mode 100644
index 0000000..eae1ae6
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
@@ -0,0 +1,41 @@
+package org.apache.rocketmq.connect.activemq.connector;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rocketmq.connect.activemq.Config;
+import org.junit.Test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class ActivemqConnectorTest {
+
+    ActivemqSourceConnector connector = new ActivemqSourceConnector();
+
+    @Test
+    public void verifyAndSetConfigTest() {
+        KeyValue keyValue = new DefaultKeyValue();
+
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+            keyValue.put(requestKey, requestKey);
+        }
+        assertEquals(connector.verifyAndSetConfig(keyValue), "");
+    }
+
+    @Test
+    public void taskClassTest() {
+        assertEquals(connector.taskClass(), ActivemqSourceTask.class);
+    }
+
+    @Test
+    public void taskConfigsTest() {
+        assertEquals(connector.taskConfigs().get(0), null);
+        KeyValue keyValue = new DefaultKeyValue();
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            keyValue.put(requestKey, requestKey);
+        }
+        connector.verifyAndSetConfig(keyValue);
+        assertEquals(connector.taskConfigs().get(0), keyValue);
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
new file mode 100644
index 0000000..2b0821b
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
@@ -0,0 +1,148 @@
+package org.apache.rocketmq.connect.activemq.connector;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.rocketmq.connect.activemq.Config;
+import org.apache.rocketmq.connect.activemq.Replicator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.alibaba.fastjson.JSON;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class ActivemqSourceTaskTest {
+
+    public void befores() throws JMSException, InterruptedException {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
+        Connection connection = connectionFactory.createConnection();
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("test-queue");
+
+        MessageProducer producer = session.createProducer(destination);
+
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        for (int i = 0; i < 20; i++) {
+            TextMessage message = session.createTextMessage("hello 我是消息:" + i);
+            producer.send(message);
+        }
+
+        session.commit();
+        session.close();
+        connection.close();
+    }
+
+    //@Test
+    public void test() throws InterruptedException {
+        KeyValue kv = new DefaultKeyValue();
+        kv.put("activemqUrl", "tcp://112.74.48.251:6166");
+        kv.put("destinationType", "queue");
+        kv.put("destinationName", "test-queue");
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        task.start(kv);
+        for (int i = 0; i < 20; ) {
+            Collection<SourceDataEntry> sourceDataEntry = task.poll();
+            i = i + sourceDataEntry.size();
+            System.out.println(sourceDataEntry);
+        }
+        Thread.sleep(20000);
+    }
+
+    @Test
+    public void pollTest() throws Exception {
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        TextMessage textMessage = new ActiveMQTextMessage();
+        textMessage.setText("hello rocketmq");
+
+        Replicator replicatorObject = Mockito.mock(Replicator.class);
+        BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+        Mockito.when(replicatorObject.getQueue()).thenReturn(queue);
+
+        Field replicator = ActivemqSourceTask.class.getDeclaredField("replicator");
+        replicator.setAccessible(true);
+        replicator.set(task, replicatorObject);
+
+        Field config = ActivemqSourceTask.class.getDeclaredField("config");
+        config.setAccessible(true);
+        config.set(task, new Config());
+
+        queue.put(textMessage);
+        Collection<SourceDataEntry> list = task.poll();
+        Assert.assertEquals(list.size(), 1);
+
+        list = task.poll();
+        Assert.assertEquals(list.size(), 0);
+
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void getMessageConnentTest() throws JMSException {
+        String value = "hello rocketmq";
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        TextMessage textMessage = new ActiveMQTextMessage();
+        textMessage.setText(value);
+        ByteBuffer buffer = task.getMessageConnent(textMessage);
+        Assert.assertEquals(new String(buffer.array()), textMessage.getText());
+
+        ObjectMessage objectMessage = new ActiveMQObjectMessage();
+        objectMessage.setObject(value);
+        buffer = task.getMessageConnent(objectMessage);
+        Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\"");
+
+        BytesMessage bytes = new ActiveMQBytesMessage();
+        bytes.writeBytes(value.getBytes());
+        bytes.reset();
+        buffer = task.getMessageConnent(bytes);
+        Assert.assertEquals(new String(buffer.array()), value);
+
+        MapMessage mapMessage = new ActiveMQMapMessage();
+        mapMessage.setString("hello", "rocketmq");
+        buffer = task.getMessageConnent(mapMessage);
+        Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
+        Assert.assertEquals(map.get("hello"), "rocketmq");
+        Assert.assertEquals(map.size(), 1);
+
+        StreamMessage streamMessage = new ActiveMQStreamMessage();
+        String valueTwo = null;
+        for (int i = 0; i < 200; i++) {
+            valueTwo = valueTwo + value;
+        }
+        streamMessage.writeBytes(valueTwo.getBytes());
+        streamMessage.reset();
+        buffer = task.getMessageConnent(streamMessage);
+        Assert.assertEquals(new String(buffer.array()), valueTwo);
+
+        task.getMessageConnent(null);
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
deleted file mode 100644
index 780cbc9..0000000
--- a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.rocketmq.connect.activemq.connector;
-
-import java.util.Collection;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.rocketmq.connect.activemq.Config;
-import org.junit.Before;
-import org.junit.Test;
-
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.internal.DefaultKeyValue;
-
-public class ActivemqTaskTest {
-
-	@Before
-	public void befores() throws JMSException, InterruptedException {
-		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
-		Connection connection = connectionFactory.createConnection();
-
-		connection.start();
-		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-		Destination destination = session.createQueue("test-queue");
-
-		MessageProducer producer = session.createProducer(destination);
-
-		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-		for (int i = 0; i < 20; i++) {
-			TextMessage message = session.createTextMessage("hello 我是消息:" + i);
-			producer.send(message);
-		}
-
-		session.commit();
-		session.close();
-		connection.close();
-	}
-
-	@Test
-	public void nullTest() {
-		
-	}
-	
-	@Test
-	public void test() throws InterruptedException {
-		KeyValue kv = new DefaultKeyValue();
-		kv.put("activemqUrl", "tcp://112.74.48.251:6166");
-		kv.put("destinationType", "queue");
-		kv.put("destinationName", "test-queue");
-		ActivemqTask task = new ActivemqTask();
-		task.start(kv);
-		for(int i = 0 ; i < 20;) {
-			Collection<SourceDataEntry> sourceDataEntry = task.poll();
-			i = i+sourceDataEntry.size();
-			System.out.println(sourceDataEntry);
-		}
-		Thread.sleep(20000);
-		
-	}
-}

[rocketmq-connect] 06/12: message type

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit d50ffb8ed6ddef82e24a4071c03a96ee3e4f1c84
Author: laohu <23...@qq.com>
AuthorDate: Tue Jun 11 08:36:05 2019 +0800

    message type
---
 pom.xml                                            |  5 --
 .../connect/activemq/connector/ActivemqTask.java   | 63 +++++++++++++++++++---
 .../activemq/connector/ActivemqTaskTest.java       |  5 ++
 3 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/pom.xml b/pom.xml
index f22c291..6e933bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,11 +152,6 @@
 		</dependency>
 		<dependency>
 			<groupId>io.openmessaging</groupId>
-			<artifactId>openmessaging-connect-runtime</artifactId>
-			<version>0.0.1-SNAPSHOT</version>
-		</dependency>
-		<dependency>
-			<groupId>io.openmessaging</groupId>
 			<artifactId>openmessaging-connector</artifactId>
 			<version>0.1.0-beta</version>
 		</dependency>
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
index c2950fa..7dead7b 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
@@ -17,13 +17,24 @@
 
 package org.apache.rocketmq.connect.activemq.connector;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
 import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
 
 import org.apache.rocketmq.connect.activemq.Config;
 import org.apache.rocketmq.connect.activemq.Replicator;
@@ -33,7 +44,6 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.fastjson.JSON;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
 import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.connector.api.source.SourceTask;
 
@@ -44,17 +54,19 @@ public class ActivemqTask extends SourceTask {
     private Replicator replicator;
 
     private Config config;
+    
+    private ByteBuffer sourcePartition;
 
-    @Override
+    
+	@Override
     public Collection<SourceDataEntry> poll() {
-
         List<SourceDataEntry> res = new ArrayList<>();
-
         try {
         	Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
-            SourceDataEntry sourceDataEntry = new SourceDataEntry(ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")), ByteBuffer.wrap(JSON.toJSONBytes(message)), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
-            
-            res.add(sourceDataEntry);
+        	if(message != null) {        		
+        		SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
+        		res.add(sourceDataEntry);
+        	}
         } catch (Exception e) {
             log.error("Mysql task poll error, current config:" + JSON.toJSONString(config), e);
         }
@@ -63,10 +75,10 @@ public class ActivemqTask extends SourceTask {
 
     @Override
     public void start(KeyValue props) {
-
         try {
             this.config = new Config();
             this.config.load(props);
+            this.sourcePartition = ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8"));
             this.replicator = new Replicator(config);
         } catch (Exception e) {
             log.error("Mysql task start failed.", e);
@@ -86,4 +98,39 @@ public class ActivemqTask extends SourceTask {
     @Override public void resume() {
 
     }
+    
+    @SuppressWarnings("unchecked")
+    public ByteBuffer getMessageConnent(Message message ) throws JMSException {
+    	byte[] data = null;
+		if(message instanceof TextMessage) {
+			data = ((TextMessage) message).getText().getBytes();
+		}else if(message instanceof ObjectMessage) {
+			data = JSON.toJSONBytes( ((ObjectMessage) message).getObject());
+		}else if(message instanceof BytesMessage) {
+			BytesMessage bytesMessage = (BytesMessage)message;
+			data = new byte[(int) bytesMessage.getBodyLength()];
+			bytesMessage.readBytes(data);
+		}else if(message instanceof MapMessage) {
+			MapMessage mapMessage = (MapMessage)message;
+			Map<String,Object> map = new HashMap<>();
+			Enumeration<Object> names = mapMessage.getMapNames();
+			while(names.hasMoreElements()) {
+				String name = names.nextElement().toString();
+				map.put(name, mapMessage.getObject(name));
+			}
+			data = JSON.toJSONBytes(map);
+		}else if(message instanceof StreamMessage) {
+			StreamMessage streamMessage = (StreamMessage)message;
+			ByteArrayOutputStream bis = new ByteArrayOutputStream();
+			byte[] by = new byte[1024];
+			int i = 0;
+			while( (i = streamMessage.readBytes(by)) != 0) {
+				bis.write(by, 0, i);
+			}
+			data = bis.toByteArray();
+		}else {
+			throw new RuntimeException("message type exception");
+		}
+		return data!=null ? ByteBuffer.wrap( data ) : null;
+    }
 }
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
index 5dcb6a7..780cbc9 100644
--- a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
@@ -45,6 +45,11 @@ public class ActivemqTaskTest {
 	}
 
 	@Test
+	public void nullTest() {
+		
+	}
+	
+	@Test
 	public void test() throws InterruptedException {
 		KeyValue kv = new DefaultKeyValue();
 		kv.put("activemqUrl", "tcp://112.74.48.251:6166");

[rocketmq-connect] 04/12: change

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit da0b21fb846de119897f6086737be5de34f1cbac
Author: laohu <23...@qq.com>
AuthorDate: Thu Jun 6 14:36:18 2019 +0800

    change
---
 README-CN.md                                       | 16 +++++++++
 README.md                                          | 16 ++++++++-
 .../apache/rocketmq/connect/activemq/Config.java   | 38 ++++++++++++++++++++--
 .../rocketmq/connect/activemq/Replicator.java      |  3 --
 .../connect/activemq/connector/ActivemqTask.java   | 10 ++++--
 .../connect/activemq/pattern/PatternProcessor.java | 16 ++++-----
 6 files changed, 81 insertions(+), 18 deletions(-)

diff --git a/README-CN.md b/README-CN.md
new file mode 100644
index 0000000..bd745e2
--- /dev/null
+++ b/README-CN.md
@@ -0,0 +1,16 @@
+##### ActiveConnector完全限定名
+org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
+
+
+##### 配置参数
+
+参数 | 作用 | 是否必填 | 默认值
+---|--- |---
+activemq.url | activemq ip与端口号 | 是 | 无
+activemq.username | 用户名 | 否 |  无
+activemq.password|  密码    | 否  | 无
+jms.destination.name | 读取的队列或者主题名   |  是 | 无
+jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
+jms.message.selector | 过滤器    |  否  |无
+jms.session.acknowledge.mode | 消息确认  | 否 | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | 是否是事务会话      | 否 | false
diff --git a/README.md b/README.md
index 8b13789..e965865 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,15 @@
-
+参数 | 作用 | 
+---|--- |---
+activemq.url | activemq ip与端口号
+activemq.username | 用户名
+activemq.password|  密码
+jms.destination.name | 读取的队列或者主题名
+jms.destination.type | 读取的类型:queue(队列)或者topic(主题)
+jms.message.selector | 过滤器
+jms.session.acknowledge.mode | 消息确认
+jms.session.transacted | 是否是事务会话
+rocketmq.topic        | 发送的topic
+rocketmq.name         | broker的用户名
+rocketmq.sk |           
+rocketmq.ak |
+rocketmq.nameserver |  nameserver url
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
index 15a95a7..af218c0 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
@@ -21,15 +21,16 @@ import java.lang.reflect.Method;
 import java.util.HashSet;
 import java.util.Set;
 
+import javax.jms.Session;
+
 import io.openmessaging.KeyValue;
 
 public class Config {
 
+	@SuppressWarnings("serial")
 	public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
 		{
 			add("activemqUrl");
-			add("activemqUsername");
-			add("activemqPassword");
 			add("destinationType");
 			add("destinationName");
 		}
@@ -44,7 +45,13 @@ public class Config {
 	public String destinationType;
 
 	public String destinationName;
+	
+	public String messageSelector;
+	
+	private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
 
+	private Boolean sessionTransacted = Boolean.FALSE;
+	
 	public void load(KeyValue props) {
 
 		properties2Object(props, this);
@@ -130,4 +137,31 @@ public class Config {
 	public void setDestinationName(String destinationName) {
 		this.destinationName = destinationName;
 	}
+
+	public String getMessageSelector() {
+		return messageSelector;
+	}
+
+	public void setMessageSelector(String messageSelector) {
+		this.messageSelector = messageSelector;
+	}
+
+	public Integer getSessionAcknowledgeMode() {
+		return sessionAcknowledgeMode;
+	}
+
+	public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
+		this.sessionAcknowledgeMode = sessionAcknowledgeMode;
+	}
+
+	public Boolean getSessionTransacted() {
+		return sessionTransacted;
+	}
+
+	public void setSessionTransacted(Boolean sessionTransacted) {
+		this.sessionTransacted = sessionTransacted;
+	}
+	
+	
+	
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index 499beb0..dd83c37 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -30,12 +30,9 @@ public class Replicator {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
 
-    private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
-
     private PatternProcessor processor;
     
     private Config config;
-    private Object lock = new Object();
     private BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
 
     public Replicator(Config config){
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
index f871be5..9743a19 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.connect.activemq.connector;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.fastjson.JSON;
 
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
 import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.connector.api.source.SourceTask;
 
@@ -50,7 +52,11 @@ public class ActivemqTask extends SourceTask {
 
         try {
         	Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
-            SourceDataEntry sourceDataEntry = null;
+        	DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(null);
+        	dataEntryBuilder.timestamp(System.currentTimeMillis());
+            SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+                    ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")),
+                    ByteBuffer.wrap(JSON.toJSONBytes(message)));
             
             res.add(sourceDataEntry);
         } catch (Exception e) {
@@ -66,10 +72,10 @@ public class ActivemqTask extends SourceTask {
             this.config = new Config();
             this.config.load(props);
             this.replicator = new Replicator(config);
-            this.replicator.start();
         } catch (Exception e) {
             log.error("Mysql task start failed.", e);
         }
+        this.replicator.start();
     }
 
     @Override
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index c1b282c..b26bfb9 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -32,31 +32,27 @@ public class PatternProcessor {
 	}
 	
 	public void start() {
+		if(!StringUtils.equals("topic", config.getDestinationType())&&!StringUtils.equals("queue", config.getDestinationType())) {
+			throw new RuntimeException("destination type is incorrectness");
+		}
+		
 		try {
 		   ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
 		   
-	        //2、使用连接工厂创建一个连接对象
 		   if(StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword()) ) {
 	           connection = connectionFactory.createConnection(config.getActivemqUsername() , config.getActivemqPassword());
 		   }else {
 			   connection = connectionFactory.createConnection();
 		   }
-	        //3、开启连接
 	        connection.start();
-	        //4、使用连接对象创建会话(session)对象
-	        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-	        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
+	        Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
 	        Destination destination = null;
 	        if(StringUtils.equals("topic", config.getDestinationType())) {
 	        	destination = session.createTopic(config.getDestinationName());
 	        }else if(StringUtils.equals("queue", config.getDestinationType())){
 	        	destination = session.createQueue(config.getDestinationName());
-	        }else {
-	        	throw new RuntimeException("");
 	        }
-	        consumer = session.createConsumer(destination);
-	        //6、使用会话对象创建生产者对象
-	        //7、向consumer对象中设置一个messageListener对象,用来接收消息
+	        consumer = session.createConsumer(destination, config.getMessageSelector());
 	        consumer.setMessageListener(new MessageListener() {
 	            @Override
 	            public void onMessage(Message message) {

[rocketmq-connect] 05/12: change

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 666376814f146732aec5be3251e4f75f170654cb
Author: laohu <23...@qq.com>
AuthorDate: Thu Jun 6 19:36:31 2019 +0800

    change
---
 .../connect/activemq/connector/ActivemqTask.java   |  6 +--
 .../activemq/connector/ActivemqTaskTest.java       | 63 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
index 9743a19..c2950fa 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
@@ -52,11 +52,7 @@ public class ActivemqTask extends SourceTask {
 
         try {
         	Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
-        	DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(null);
-        	dataEntryBuilder.timestamp(System.currentTimeMillis());
-            SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-                    ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")),
-                    ByteBuffer.wrap(JSON.toJSONBytes(message)));
+            SourceDataEntry sourceDataEntry = new SourceDataEntry(ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")), ByteBuffer.wrap(JSON.toJSONBytes(message)), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
             
             res.add(sourceDataEntry);
         } catch (Exception e) {
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
new file mode 100644
index 0000000..5dcb6a7
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
@@ -0,0 +1,63 @@
+package org.apache.rocketmq.connect.activemq.connector;
+
+import java.util.Collection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.rocketmq.connect.activemq.Config;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class ActivemqTaskTest {
+
+	@Before
+	public void befores() throws JMSException, InterruptedException {
+		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
+		Connection connection = connectionFactory.createConnection();
+
+		connection.start();
+		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+		Destination destination = session.createQueue("test-queue");
+
+		MessageProducer producer = session.createProducer(destination);
+
+		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+		for (int i = 0; i < 20; i++) {
+			TextMessage message = session.createTextMessage("hello 我是消息:" + i);
+			producer.send(message);
+		}
+
+		session.commit();
+		session.close();
+		connection.close();
+	}
+
+	@Test
+	public void test() throws InterruptedException {
+		KeyValue kv = new DefaultKeyValue();
+		kv.put("activemqUrl", "tcp://112.74.48.251:6166");
+		kv.put("destinationType", "queue");
+		kv.put("destinationName", "test-queue");
+		ActivemqTask task = new ActivemqTask();
+		task.start(kv);
+		for(int i = 0 ; i < 20;) {
+			Collection<SourceDataEntry> sourceDataEntry = task.poll();
+			i = i+sourceDataEntry.size();
+			System.out.println(sourceDataEntry);
+		}
+		Thread.sleep(20000);
+		
+	}
+}

[rocketmq-connect] 08/12: add licenses

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 23a5b2490f690385a8477ca47b19e0c48a75e9b3
Author: laohu <23...@qq.com>
AuthorDate: Wed Jun 12 09:09:43 2019 +0800

    add licenses
---
 .../connect/activemq/pattern/PatternProcessor.java      | 17 +++++++++++++++++
 .../rocketmq/connect/activemq/ReplicatorTest.java       | 17 +++++++++++++++++
 .../activemq/connector/ActivemqConnectorTest.java       | 17 +++++++++++++++++
 .../activemq/connector/ActivemqSourceTaskTest.java      | 17 +++++++++++++++++
 4 files changed, 68 insertions(+)

diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index 60a34cf..b0836f9 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.activemq.pattern;
 
 import javax.jms.Connection;
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
index 909a5d7..28cafa4 100644
--- a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.activemq;
 
 import java.lang.reflect.Field;
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
index eae1ae6..22b301c 100644
--- a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.activemq.connector;
 
 import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
index 2b0821b..a71050c 100644
--- a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.activemq.connector;
 
 import java.lang.reflect.Field;

[rocketmq-connect] 12/12: Add 'connector/rocketmq-connect-activemq/' from commit 'e8aedf31c7eb4b26a2d9df7c766427fdfd845e9f'

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 0669f46a5018a92c4ad08fc6704dfa6606eb78ab
Merge: a1e5f60 e8aedf3
Author: odbozhou <87...@qq.com>
AuthorDate: Wed Mar 2 11:04:26 2022 +0800

    Add 'connector/rocketmq-connect-activemq/' from commit 'e8aedf31c7eb4b26a2d9df7c766427fdfd845e9f'
    
    git-subtree-dir: connector/rocketmq-connect-activemq
    git-subtree-mainline: a1e5f6084d893257cbe861e9ee8d25894200bb7d
    git-subtree-split: e8aedf31c7eb4b26a2d9df7c766427fdfd845e9f

 connector/rocketmq-connect-activemq/README-CN.md   |  16 ++
 connector/rocketmq-connect-activemq/README.md      |  16 ++
 connector/rocketmq-connect-activemq/pom.xml        | 201 +++++++++++++++++++++
 .../apache/rocketmq/connect/activemq/Config.java   | 163 +++++++++++++++++
 .../rocketmq/connect/activemq/ErrorCode.java       |   8 +
 .../rocketmq/connect/activemq/Replicator.java      |  63 +++++++
 .../connector/ActivemqSourceConnector.java         |  72 ++++++++
 .../activemq/connector/ActivemqSourceTask.java     | 141 +++++++++++++++
 .../connect/activemq/pattern/PatternProcessor.java |  89 +++++++++
 .../rocketmq/connect/activemq/ReplicatorTest.java  |  74 ++++++++
 .../activemq/connector/ActivemqConnectorTest.java  |  58 ++++++
 .../activemq/connector/ActivemqSourceTaskTest.java | 165 +++++++++++++++++
 12 files changed, 1066 insertions(+)

diff --cc connector/rocketmq-connect-activemq/README-CN.md
index 0000000,be03683..be03683
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/README-CN.md
+++ b/connector/rocketmq-connect-activemq/README-CN.md
diff --cc connector/rocketmq-connect-activemq/README.md
index 0000000,e15149e..e15149e
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/README.md
+++ b/connector/rocketmq-connect-activemq/README.md
diff --cc connector/rocketmq-connect-activemq/pom.xml
index 0000000,ccb4118..ccb4118
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/pom.xml
+++ b/connector/rocketmq-connect-activemq/pom.xml
diff --cc connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
index 0000000,30f898d..30f898d
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
+++ b/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
diff --cc connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java
index 0000000,de3b3f5..de3b3f5
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java
+++ b/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java
diff --cc connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index 0000000,e0ebe12..e0ebe12
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++ b/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
diff --cc connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
index 0000000,7e6290b..7e6290b
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
+++ b/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java
diff --cc connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
index 0000000,c009274..c009274
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
+++ b/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
diff --cc connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index 0000000,6e39a7e..6e39a7e
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ b/connector/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
diff --cc connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
index 0000000,b94237e..b94237e
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
+++ b/connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
diff --cc connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
index 0000000,22b301c..22b301c
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
+++ b/connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java
diff --cc connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
index 0000000,a71050c..a71050c
mode 000000,100644..100644
--- a/connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java
+++ b/connector/rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTaskTest.java

[rocketmq-connect] 11/12: Change README.md (#301)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit e8aedf31c7eb4b26a2d9df7c766427fdfd845e9f
Author: githublaohu <23...@qq.com>
AuthorDate: Thu Jun 13 19:09:22 2019 +0800

    Change README.md (#301)
    
    * Update README-CN.md
    
    * Update README.md
---
 README-CN.md |  2 +-
 README.md    | 31 ++++++++++++++++---------------
 2 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/README-CN.md b/README-CN.md
index bd745e2..be03683 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -5,7 +5,7 @@ org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
 ##### 配置参数
 
 参数 | 作用 | 是否必填 | 默认值
----|--- |---
+---|--- |--- | ---
 activemq.url | activemq ip与端口号 | 是 | 无
 activemq.username | 用户名 | 否 |  无
 activemq.password|  密码    | 否  | 无
diff --git a/README.md b/README.md
index e965865..e15149e 100644
--- a/README.md
+++ b/README.md
@@ -1,15 +1,16 @@
-参数 | 作用 | 
----|--- |---
-activemq.url | activemq ip与端口号
-activemq.username | 用户名
-activemq.password|  密码
-jms.destination.name | 读取的队列或者主题名
-jms.destination.type | 读取的类型:queue(队列)或者topic(主题)
-jms.message.selector | 过滤器
-jms.session.acknowledge.mode | 消息确认
-jms.session.transacted | 是否是事务会话
-rocketmq.topic        | 发送的topic
-rocketmq.name         | broker的用户名
-rocketmq.sk |           
-rocketmq.ak |
-rocketmq.nameserver |  nameserver url
+##### ActiveConnector fully-qualified name
+org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
+
+
+##### parameter configuration
+
+parameter | effect | required |default
+---|--- |--- | ---
+activemq.url | The URL of the ActiveMQ broker | yes | null
+activemq.username | The username to use when connecting to ActiveMQ | no |  null
+activemq.password|  The password to use when connecting to ActiveMQ    | no  | null
+jms.destination.name | The name of the JMS destination (queue or topic) to read from   |  yes | null
+jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null
+jms.message.selector | The message selector that should be applied to messages in the destination    |  no  | null 
+jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session  | null | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session      | null | false