You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:23:08 UTC

[rocketmq-connect] branch master updated (232c585 -> 5feab9c)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git.


    from 232c585  Add 'connector/rocketmq-connect-mongo/' from commit 'b307466f3fc4fa25ba21de87c568a4095cf18f4f'
     new 4e42a37  Initial rocketmq-connect-rabbitmq
     new a7ab1c4  [ISSUE #312] Implement rocketmq connect RabbitMQ (#313)
     new 5feab9c  Add 'connector/rocketmq-connect-rabbitmq/' from commit 'a7ab1c46a9938df2984c3ad730609c1a7797cc9b'

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../README.md                                      |   9 +-
 connector/rocketmq-connect-rabbitmq/pom.xml        | 205 +++++++++++++++++++++
 .../rocketmq/connect/rabbitmq}/ErrorCode.java      |   2 +-
 .../rocketmq/connect/rabbitmq/RabbitmqConfig.java  |  61 ++++++
 .../connector/RabbitmqSourceConnector.java         |  30 +--
 .../rabbitmq/connector/RabbitmqSourceTask.java}    |  33 ++--
 .../rabbitmq/pattern/RabbitMQPatternProcessor.java |  48 +++++
 .../rocketmq/connect/jms/RabbitmqConfigTest.java}  |  13 +-
 .../connector/RabbitmqSourceConnectorTest.java}    |  52 +++---
 .../rabbitmq/connector/RabbitmqSourceTaskTest.java | 164 +++++++++++++++++
 .../pattern/RabbitMQPatternProcessorTest.java}     |  34 ++--
 11 files changed, 554 insertions(+), 97 deletions(-)
 copy connector/{rocketmq-connect-activemq => rocketmq-connect-rabbitmq}/README.md (71%)
 create mode 100644 connector/rocketmq-connect-rabbitmq/pom.xml
 copy connector/{rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq => rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq}/ErrorCode.java (74%)
 create mode 100644 connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java
 copy rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java => connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java (65%)
 copy connector/{rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/EnumColumnParser.java => rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java} (55%)
 create mode 100644 connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java
 copy connector/{rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java => rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java} (82%)
 copy connector/{rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java => rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java} (51%)
 create mode 100644 connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java
 copy connector/{rocketmq-connect-kafka/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java => rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java} (53%)

[rocketmq-connect] 02/03: [ISSUE #312] Implement rocketmq connect RabbitMQ (#313)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit a7ab1c46a9938df2984c3ad730609c1a7797cc9b
Author: githublaohu <23...@qq.com>
AuthorDate: Sun Jul 21 21:17:48 2019 +0800

    [ISSUE #312] Implement rocketmq connect RabbitMQ (#313)
    
    * complete RabbitMQ  connector
    
    * delete class file
---
 README.md                                          |  16 ++
 pom.xml                                            | 205 +++++++++++++++++++++
 .../rocketmq/connect/rabbitmq/ErrorCode.java       |   8 +
 .../rocketmq/connect/rabbitmq/RabbitmqConfig.java  |  61 ++++++
 .../connector/RabbitmqSourceConnector.java         |  35 ++++
 .../rabbitmq/connector/RabbitmqSourceTask.java     |  37 ++++
 .../rabbitmq/pattern/RabbitMQPatternProcessor.java |  48 +++++
 .../rocketmq/connect/jms/RabbitmqConfigTest.java   |  28 +++
 .../connector/RabbitmqSourceConnectorTest.java     |  54 ++++++
 .../rabbitmq/connector/RabbitmqSourceTaskTest.java | 164 +++++++++++++++++
 .../pattern/RabbitMQPatternProcessorTest.java      |  41 +++++
 11 files changed, 697 insertions(+)

diff --git a/README.md b/README.md
index 8b13789..708bee3 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,17 @@
+##### ActiveConnector fully-qualified name
+org.apache.rocketmq.connect.rabbitmq.connector.RabbitmqSourceConnector
+
+
+##### parameter configuration
+
+parameter | effect | required |default
+---|--- |--- | ---
+rabbtimq.url | The URL of the RabbtiMQ broker | yes | null
+rabbtimq.username | The username to use when connecting to RabbtiMQ | yes |  null
+rabbtimq.password|  The password to use when connecting to RabbtiMQ    | yes  | null
+jms.destination.name | The name of the JMS destination (queue or topic) to read from   |  yes | null
+jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null
+jms.message.selector | The message selector that should be applied to messages in the destination    |  no  | null 
+jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session  | null | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session      | null | false
 
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..d5687c1
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,205 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>org.apache.rocketmq</groupId>
+	<artifactId>rocketmq-connect-rabbitmq</artifactId>
+	<version>1.0.0</version>
+
+	<name>connect-rabbitmq</name>
+	<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-rabbitmq</url>
+
+	<licenses>
+		<license>
+			<name>The Apache Software License, Version 2.0</name>
+			<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+		</license>
+	</licenses>
+
+	<issueManagement>
+		<system>jira</system>
+		<url>https://issues.apache.org/jira/browse/RocketMQ</url>
+	</issueManagement>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+		<!-- Compiler settings properties -->
+		<maven.compiler.source>1.8</maven.compiler.source>
+		<maven.compiler.target>1.8</maven.compiler.target>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>versions-maven-plugin</artifactId>
+				<version>2.3</version>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>clirr-maven-plugin</artifactId>
+				<version>2.7</version>
+			</plugin>
+			<plugin>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.6.1</version>
+				<configuration>
+					<source>${maven.compiler.source}</source>
+					<target>${maven.compiler.target}</target>
+					<compilerVersion>${maven.compiler.source}</compilerVersion>
+					<showDeprecation>true</showDeprecation>
+					<showWarnings>true</showWarnings>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.19.1</version>
+				<configuration>
+					<argLine>-Xms512m -Xmx1024m</argLine>
+					<forkMode>always</forkMode>
+					<includes>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-site-plugin</artifactId>
+				<version>3.6</version>
+				<configuration>
+					<locales>en_US</locales>
+					<outputEncoding>UTF-8</outputEncoding>
+					<inputEncoding>UTF-8</inputEncoding>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-source-plugin</artifactId>
+				<version>3.0.1</version>
+				<executions>
+					<execution>
+						<id>attach-sources</id>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<artifactId>maven-javadoc-plugin</artifactId>
+				<version>2.10.4</version>
+				<configuration>
+					<charset>UTF-8</charset>
+					<locale>en_US</locale>
+					<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+				</configuration>
+				<executions>
+					<execution>
+						<id>aggregate</id>
+						<goals>
+							<goal>aggregate</goal>
+						</goals>
+						<phase>site</phase>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<artifactId>maven-resources-plugin</artifactId>
+				<version>3.0.2</version>
+				<configuration>
+					<encoding>${project.build.sourceEncoding}</encoding>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>findbugs-maven-plugin</artifactId>
+				<version>3.0.4</version>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.11</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.assertj</groupId>
+			<artifactId>assertj-core</artifactId>
+			<version>2.6.0</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-core</artifactId>
+			<version>2.6.3</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>io.openmessaging</groupId>
+			<artifactId>openmessaging-connector</artifactId>
+			<version>0.1.0-beta</version>
+		</dependency>
+		<dependency>
+			<groupId>com.alibaba</groupId>
+			<artifactId>fastjson</artifactId>
+			<version>1.2.51</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>1.7.7</version>
+		</dependency>
+		<dependency>
+			<groupId>ch.qos.logback</groupId>
+			<artifactId>logback-classic</artifactId>
+			<version>1.0.13</version>
+		</dependency>
+		<dependency>
+			<groupId>ch.qos.logback</groupId>
+			<artifactId>logback-core</artifactId>
+			<version>1.0.13</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-openmessaging</artifactId>
+			<version>4.3.2</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-cli</groupId>
+			<artifactId>commons-cli</artifactId>
+			<version>1.2</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-connect-jms</artifactId>
+			<version>1.0.0</version>
+		</dependency>
+		<dependency>
+			<groupId>com.rabbitmq</groupId>
+			<artifactId>amqp-client</artifactId>
+			<version>5.7.1</version>
+		</dependency>
+		<dependency>
+			<groupId>com.rabbitmq.jms</groupId>
+			<artifactId>rabbitmq-jms</artifactId>
+			<version>1.11.2</version>
+		</dependency>
+	</dependencies>
+
+</project>
diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java b/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java
new file mode 100644
index 0000000..5f70361
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java
@@ -0,0 +1,8 @@
+package org.apache.rocketmq.connect.rabbitmq;
+
+public class ErrorCode {
+
+    public static final int START_ERROR_CODE = 10001;
+
+    public static final int STOP_ERROR_CODE = 10002;
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java b/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java
new file mode 100644
index 0000000..2b12c18
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.connect.jms.Config;
+
+public class RabbitmqConfig extends Config {
+
+    @SuppressWarnings("serial")
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("rabbitmqUrl");
+            add("rabbitmqUsername");
+            add("rabbitmqPassword");
+            add("destinationType");
+            add("destinationName");
+        }
+    };
+
+    public String getRabbitmqUrl() {
+        return getBrokerUrl();
+    }
+
+    public void setRabbitmqUrl(String rabbitmqUrl) {
+        setBrokerUrl(rabbitmqUrl);
+    }
+
+    public String getRabbitmqUsername() {
+        return getUsername();
+    }
+
+    public void setRabbitmqUsername(String rabbitmqUsername) {
+        setUsername(rabbitmqUsername);
+    }
+
+    public String getRabbitmqPassword() {
+        return getPassword();
+    }
+
+    public void setRabbitmqPassword(String rabbitmqPassword) {
+        setPassword(rabbitmqPassword);
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java
new file mode 100644
index 0000000..328632d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.connector;
+
+import io.openmessaging.connector.api.Task;
+import java.util.Set;
+import org.apache.rocketmq.connect.jms.connector.BaseJmsSourceConnector;
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+
+public class RabbitmqSourceConnector extends BaseJmsSourceConnector {
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return RabbitmqSourceTask.class;
+    }
+
+    public Set<String> getRequiredConfig() {
+        return RabbitmqConfig.REQUEST_CONFIG;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java
new file mode 100644
index 0000000..ab2d1e4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.connector;
+
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.jms.connector.BaseJmsSourceTask;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+import org.apache.rocketmq.connect.rabbitmq.pattern.RabbitMQPatternProcessor;
+
+public class RabbitmqSourceTask extends BaseJmsSourceTask {
+
+    public PatternProcessor getPatternProcessor(Replicator replicator) {
+        return new RabbitMQPatternProcessor(replicator);
+    }
+
+    @Override
+    public Config getConfig() {
+        return new RabbitmqConfig();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java
new file mode 100644
index 0000000..5056a11
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.pattern;
+
+import com.rabbitmq.jms.admin.RMQConnectionFactory;
+import io.openmessaging.connector.api.exception.DataConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import org.apache.rocketmq.connect.jms.ErrorCode;
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+
+public class RabbitMQPatternProcessor extends PatternProcessor {
+
+    public RabbitMQPatternProcessor(Replicator replicator) {
+        super(replicator);
+    }
+
+    public ConnectionFactory connectionFactory() {
+        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
+        try {
+            List<String> urlList = new ArrayList<>();
+            urlList.add(config.getBrokerUrl());
+            connectionFactory.setUris(urlList);
+        } catch (JMSException e) {
+            throw new DataConnectException(ErrorCode.START_ERROR_CODE, e.getMessage(), e);
+        }
+        return connectionFactory;
+    }
+
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java b/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java
new file mode 100644
index 0000000..7228f68
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+
+public class RabbitmqConfigTest {
+	
+    RabbitmqConfig config;
+
+   
+    
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java
new file mode 100644
index 0000000..ea52a43
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.connector;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+import org.junit.Test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class RabbitmqSourceConnectorTest {
+
+	RabbitmqSourceConnector rabbitmqSourceConnector = new RabbitmqSourceConnector();
+	
+	@Test
+	public void taskClass() {
+		 assertEquals( RabbitmqSourceTask.class, rabbitmqSourceConnector.taskClass());
+	}
+
+	@Test
+	public void getRequiredConfig() {
+		assertEquals( RabbitmqConfig.REQUEST_CONFIG , rabbitmqSourceConnector.getRequiredConfig());
+	}
+	
+	
+	@Test
+	public void verifyAndSetConfig() {
+        KeyValue keyValue = new DefaultKeyValue();
+
+        for (String requestKey :RabbitmqConfig.REQUEST_CONFIG) {
+            assertEquals(rabbitmqSourceConnector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+            keyValue.put(requestKey, requestKey);
+        }
+        assertEquals(rabbitmqSourceConnector.verifyAndSetConfig(keyValue), "");
+	}
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java
new file mode 100644
index 0000000..232ddc1
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.connector;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+import org.apache.rocketmq.connect.rabbitmq.pattern.RabbitMQPatternProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.jms.admin.RMQConnectionFactory;
+import com.rabbitmq.jms.client.message.RMQBytesMessage;
+import com.rabbitmq.jms.client.message.RMQMapMessage;
+import com.rabbitmq.jms.client.message.RMQObjectMessage;
+import com.rabbitmq.jms.client.message.RMQStreamMessage;
+import com.rabbitmq.jms.client.message.RMQTextMessage;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class RabbitmqSourceTaskTest {
+
+	//@Before
+	public void befores() throws JMSException, InterruptedException {
+		RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
+		connectionFactory.setUri("amqp://112.74.48.251:5672");
+		Connection connection = connectionFactory.createConnection("admin", "admin");
+
+		connection.start();
+		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+		Destination destination = session.createQueue("test-queue");
+
+		MessageProducer producer = session.createProducer(destination);
+
+		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+		for (int i = 0; i < 20; i++) {
+			TextMessage message = session.createTextMessage("hello 我是消息:" + i);
+			producer.send(message);
+		}
+
+		session.commit();
+		session.close();
+		connection.close();
+	}
+
+	//@Test
+	public void test() throws InterruptedException {
+		KeyValue kv = new DefaultKeyValue();
+		kv.put("rabbitmqUrl", "amqp://112.74.48.251:5672");
+		kv.put("rabbitmqUsername", "admin");
+		kv.put("rabbitmqPassword", "admin");
+		kv.put("destinationType", "queue");
+		kv.put("destinationName", "test-queue");
+		RabbitmqSourceTask task = new RabbitmqSourceTask();
+		task.start(kv);
+		for (int i = 0; i < 20;) {
+			Collection<SourceDataEntry> sourceDataEntry = task.poll();
+			i = i + sourceDataEntry.size();
+			System.out.println(sourceDataEntry);
+		}
+		Thread.sleep(20000);
+	}
+
+	@Test
+	public void getMessageConnentTest() throws JMSException {
+		String value = "hello rocketmq";
+		RabbitmqSourceTask task = new RabbitmqSourceTask();
+		RMQTextMessage textMessage = new RMQTextMessage();
+		textMessage.setText(value);
+		ByteBuffer buffer = task.getMessageContent(textMessage);
+		Assert.assertEquals(new String(buffer.array()), textMessage.getText());
+
+		ObjectMessage objectMessage = new RMQObjectMessage();
+		objectMessage.setObject(value);
+		buffer = task.getMessageContent(objectMessage);
+		Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\"");
+
+		BytesMessage bytes = new RMQBytesMessage();
+		bytes.writeBytes(value.getBytes());
+		bytes.reset();
+		buffer = task.getMessageContent(bytes);
+		Assert.assertEquals(new String(buffer.array()), value);
+
+		MapMessage mapMessage = new RMQMapMessage();
+		mapMessage.setString("hello", "rocketmq");
+		buffer = task.getMessageContent(mapMessage);
+		Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
+		Assert.assertEquals(map.get("hello"), "rocketmq");
+		Assert.assertEquals(map.size(), 1);
+
+		StreamMessage streamMessage = new RMQStreamMessage();
+		String valueTwo = null;
+		for (int i = 0; i < 200; i++) {
+			valueTwo = valueTwo + value;
+		}
+		streamMessage.writeBytes(valueTwo.getBytes());
+		streamMessage.reset();
+		//buffer = task.getMessageContent(streamMessage);
+		//Assert.assertEquals(new String(buffer.array()), valueTwo);
+
+	}
+	
+	@Test(expected=Exception.class)
+	public void getMessageConnentException() throws JMSException {
+		RabbitmqSourceTask task = new RabbitmqSourceTask();
+		task.getMessageContent(null);
+		
+	}
+
+	public void getPatternProcessor(Replicator replicator) {
+		KeyValue kv = new DefaultKeyValue();
+		kv.put("rabbitmqUrl", "amqp://112.74.48.251:5672");
+		kv.put("rabbitmqUsername", "admin");
+		kv.put("rabbitmqPassword", "admin");
+		kv.put("destinationType", "queue");
+		kv.put("destinationName", "test-queue");
+        RabbitmqConfig config = new RabbitmqConfig();
+        config.load(kv);
+        replicator = new Replicator(config,null);
+        RabbitmqSourceTask task = new RabbitmqSourceTask();
+		assertEquals(RabbitMQPatternProcessor.class, task.getPatternProcessor(replicator).getClass());
+	}
+
+	@Test
+	public void getConfig() {
+		RabbitmqSourceTask task = new RabbitmqSourceTask();
+		assertEquals(task.getConfig().getClass() , RabbitmqConfig.class);
+	}
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java b/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java
new file mode 100644
index 0000000..a23d233
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.rabbitmq.pattern;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.rabbitmq.RabbitmqConfig;
+import org.junit.Test;
+
+import com.rabbitmq.jms.admin.RMQConnectionFactory;
+
+public  class RabbitMQPatternProcessorTest{
+
+
+	@Test
+	public  void connectionFactory() {
+		RabbitmqConfig rabbitmqConfig = new RabbitmqConfig();
+		rabbitmqConfig.setRabbitmqUrl("amqp://112.74.48.251:5672");
+		Replicator replicator = new Replicator(rabbitmqConfig, null);
+		RabbitMQPatternProcessor patternProcessor = new RabbitMQPatternProcessor(replicator);
+		assertEquals(RMQConnectionFactory.class, patternProcessor.connectionFactory().getClass());
+    }
+    
+
+}

[rocketmq-connect] 03/03: Add 'connector/rocketmq-connect-rabbitmq/' from commit 'a7ab1c46a9938df2984c3ad730609c1a7797cc9b'

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 5feab9cd053c7fa6b7b64cfeac5af860f58bd725
Merge: 232c585 a7ab1c4
Author: odbozhou <87...@qq.com>
AuthorDate: Wed Mar 2 11:22:43 2022 +0800

    Add 'connector/rocketmq-connect-rabbitmq/' from commit 'a7ab1c46a9938df2984c3ad730609c1a7797cc9b'
    
    git-subtree-dir: connector/rocketmq-connect-rabbitmq
    git-subtree-mainline: 232c585fdee7e6a0ab6d7aef04140019bc9a3be7
    git-subtree-split: a7ab1c46a9938df2984c3ad730609c1a7797cc9b

 connector/rocketmq-connect-rabbitmq/README.md      |  17 ++
 connector/rocketmq-connect-rabbitmq/pom.xml        | 205 +++++++++++++++++++++
 .../rocketmq/connect/rabbitmq/ErrorCode.java       |   8 +
 .../rocketmq/connect/rabbitmq/RabbitmqConfig.java  |  61 ++++++
 .../connector/RabbitmqSourceConnector.java         |  35 ++++
 .../rabbitmq/connector/RabbitmqSourceTask.java     |  37 ++++
 .../rabbitmq/pattern/RabbitMQPatternProcessor.java |  48 +++++
 .../rocketmq/connect/jms/RabbitmqConfigTest.java   |  28 +++
 .../connector/RabbitmqSourceConnectorTest.java     |  54 ++++++
 .../rabbitmq/connector/RabbitmqSourceTaskTest.java | 164 +++++++++++++++++
 .../pattern/RabbitMQPatternProcessorTest.java      |  41 +++++
 11 files changed, 698 insertions(+)

diff --cc connector/rocketmq-connect-rabbitmq/README.md
index 0000000,708bee3..708bee3
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/README.md
+++ b/connector/rocketmq-connect-rabbitmq/README.md
diff --cc connector/rocketmq-connect-rabbitmq/pom.xml
index 0000000,d5687c1..d5687c1
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/pom.xml
+++ b/connector/rocketmq-connect-rabbitmq/pom.xml
diff --cc connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java
index 0000000,5f70361..5f70361
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java
+++ b/connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/ErrorCode.java
diff --cc connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java
index 0000000,2b12c18..2b12c18
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java
+++ b/connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/RabbitmqConfig.java
diff --cc connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java
index 0000000,328632d..328632d
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java
+++ b/connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnector.java
diff --cc connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java
index 0000000,ab2d1e4..ab2d1e4
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java
+++ b/connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTask.java
diff --cc connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java
index 0000000,5056a11..5056a11
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java
+++ b/connector/rocketmq-connect-rabbitmq/src/main/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessor.java
diff --cc connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java
index 0000000,7228f68..7228f68
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java
+++ b/connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/jms/RabbitmqConfigTest.java
diff --cc connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java
index 0000000,ea52a43..ea52a43
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java
+++ b/connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceConnectorTest.java
diff --cc connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java
index 0000000,232ddc1..232ddc1
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java
+++ b/connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/connector/RabbitmqSourceTaskTest.java
diff --cc connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java
index 0000000,a23d233..a23d233
mode 000000,100644..100644
--- a/connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java
+++ b/connector/rocketmq-connect-rabbitmq/src/test/java/org/apache/rocketmq/connect/rabbitmq/pattern/RabbitMQPatternProcessorTest.java

[rocketmq-connect] 01/03: Initial rocketmq-connect-rabbitmq

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 4e42a37e825e4575c8155e5aac47707280fb6655
Author: duheng.dh <du...@alibaba-inc.com>
AuthorDate: Thu Jun 13 19:24:25 2019 +0800

    Initial rocketmq-connect-rabbitmq
---
 README.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/README.md
@@ -0,0 +1 @@
+