You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:56 UTC
[rocketmq-connect] 03/06: [ISSUE #312] Implement rocketmq connect RabbitMQ (#313)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 6801219d34b33501c70a2e76e6147c07edfd2b41
Author: githublaohu <23...@qq.com>
AuthorDate: Sun Jul 21 21:17:48 2019 +0800
[ISSUE #312] Implement rocketmq connect RabbitMQ (#313)
* complete RabbitMQ connector
* delete class file
---
README-CN.md | 16 --
README.md | 16 --
pom.xml | 5 -
rocketmq-connect-jms.iml | 21 +++
.../org/apache/rocketmq/connect/jms/Config.java | 4 +-
.../jms/connector/BaseJmsSourceConnector.java | 4 +-
.../connect/jms/connector/BaseJmsSourceTask.java | 4 +-
.../connect/jms/pattern/PatternProcessor.java | 4 +-
.../rocketmq/connect/jms/ReplicatorTest.java | 74 ----------
.../jms/connector/ActivemqSourceTaskTest.java | 164 ---------------------
.../jms/connector/BaseJmsSourceConnectorTest.java | 2 +-
11 files changed, 29 insertions(+), 285 deletions(-)
diff --git a/README-CN.md b/README-CN.md
deleted file mode 100644
index be03683..0000000
--- a/README-CN.md
+++ /dev/null
@@ -1,16 +0,0 @@
-##### ActiveConnector完全限定名
-org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
-
-
-##### 配置参数
-
-参数 | 作用 | 是否必填 | 默认值
----|--- |--- | ---
-activemq.url | activemq ip与端口号 | 是 | 无
-activemq.username | 用户名 | 否 | 无
-activemq.password| 密码 | 否 | 无
-jms.destination.name | 读取的队列或者主题名 | 是 | 无
-jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
-jms.message.selector | 过滤器 | 否 |无
-jms.session.acknowledge.mode | 消息确认 | 否 | Session.AUTO_ACKNOWLEDGE
-jms.session.transacted | 是否是事务会话 | 否 | false
diff --git a/README.md b/README.md
deleted file mode 100644
index e15149e..0000000
--- a/README.md
+++ /dev/null
@@ -1,16 +0,0 @@
-##### ActiveConnector fully-qualified name
-org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
-
-
-##### parameter configuration
-
-parameter | effect | required |default
----|--- |--- | ---
-activemq.url | The URL of the ActiveMQ broker | yes | null
-activemq.username | The username to use when connecting to ActiveMQ | no | null
-activemq.password| The password to use when connecting to ActiveMQ | no | null
-jms.destination.name | The name of the JMS destination (queue or topic) to read from | yes | null
-jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null
-jms.message.selector | The message selector that should be applied to messages in the destination | no | null
-jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session | null | Session.AUTO_ACKNOWLEDGE
-jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session | null | false
diff --git a/pom.xml b/pom.xml
index c7c3ba2..50e0490 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,11 +186,6 @@
<version>1.2</version>
</dependency>
<dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-all</artifactId>
- <version>5.9.0</version>
- </dependency>
- <dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0</version>
diff --git a/rocketmq-connect-jms.iml b/rocketmq-connect-jms.iml
new file mode 100644
index 0000000..b187d53
--- /dev/null
+++ b/rocketmq-connect-jms.iml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4">
+ <component name="EclipseModuleManager">
+ <conelement value="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" />
+ <src_description expected_position="0">
+ <src_folder value="file://$MODULE_DIR$/src/main/java" expected_position="0" />
+ <src_folder value="file://$MODULE_DIR$/src/test/java" expected_position="1" />
+ </src_description>
+ </component>
+ <component name="NewModuleRootManager">
+ <output url="file://$MODULE_DIR$/target/classes" />
+ <exclude-output />
+ <content url="file://$MODULE_DIR$">
+ <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+ <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="false" />
+ </content>
+ <orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="jdk" jdkName="JavaSE-1.8" jdkType="JavaSDK" />
+ <orderEntry type="library" name="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" level="application" />
+ </component>
+</module>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Config.java b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
index e9cd178..1e69f89 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
@@ -94,9 +94,7 @@ public class Config {
}
}
}
- }
-
-
+ }
public String getBrokerUrl() {
return brokerUrl;
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
index f939881..68f7677 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.connect.jms.connector;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.source.SourceConnector;
@@ -31,7 +30,6 @@ public abstract class BaseJmsSourceConnector extends SourceConnector {
@Override
public String verifyAndSetConfig(KeyValue config) {
-
for (String requestKey : getRequiredConfig()) {
if (!config.containsKey(requestKey)) {
return "Request config key: " + requestKey;
@@ -69,5 +67,5 @@ public abstract class BaseJmsSourceConnector extends SourceConnector {
return config;
}
- abstract Set<String> getRequiredConfig();
+ public abstract Set<String> getRequiredConfig();
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
index f43e7fb..7b1f7c4 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
@@ -82,7 +82,6 @@ public abstract class BaseJmsSourceTask extends SourceTask {
this.config = new Config();
this.config.load(props);
this.sourcePartition = ByteBuffer.wrap(config.getBrokerUrl().getBytes("UTF-8"));
- this.replicator = new Replicator(config,this);
this.replicator.start();
} catch (Exception e) {
log.error("activemq task start failed.", e);
@@ -143,6 +142,9 @@ public abstract class BaseJmsSourceTask extends SourceTask {
}
return ByteBuffer.wrap(data);
}
+
+
+ public abstract Config getConfig();
public abstract PatternProcessor getPatternProcessor(Replicator replicator);
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
index e03691f..5399045 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
@@ -33,7 +33,7 @@ public abstract class PatternProcessor {
private Replicator replicator;
- Config config;
+ protected Config config;
private Connection connection;
@@ -47,7 +47,7 @@ public abstract class PatternProcessor {
}
public abstract ConnectionFactory connectionFactory();
-
+
public void start() throws Exception {
if (!StringUtils.equals("topic", config.getDestinationType())
&& !StringUtils.equals("queue", config.getDestinationType())) {
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
deleted file mode 100644
index 32b8f04..0000000
--- a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.jms;
-
-import java.lang.reflect.Field;
-
-import javax.jms.Message;
-
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import org.junit.Assert;
-
-public class ReplicatorTest {
-
- Replicator replicator;
-
- PatternProcessor patternProcessor;
-
- Config config;
-
- @Before
- public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
- config = new Config();
- replicator = new Replicator(config,null);
-
- patternProcessor = Mockito.mock(PatternProcessor.class);
-
- Field processor = Replicator.class.getDeclaredField("processor");
- processor.setAccessible(true);
- processor.set(replicator, patternProcessor);
- }
-
- @Test(expected = RuntimeException.class)
- public void startTest() throws Exception {
- replicator.start();
- }
-
- @Test
- public void stop() throws Exception {
- replicator.stop();
- Mockito.verify(patternProcessor, Mockito.times(1)).stop();
- }
-
- @Test
- public void commitAddGetQueueTest() {
- Message message = new ActiveMQTextMessage();
- replicator.commit(message, false);
- Assert.assertEquals(replicator.getQueue().poll(), message);
- }
-
- @Test
- public void getConfigTest() {
- Assert.assertEquals(replicator.getConfig(), config);
- }
-}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
deleted file mode 100644
index 72e818a..0000000
--- a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.jms.connector;
-
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQStreamMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.rocketmq.connect.jms.Config;
-import org.apache.rocketmq.connect.jms.Replicator;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import com.alibaba.fastjson.JSON;
-
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.internal.DefaultKeyValue;
-
-public class ActivemqSourceTaskTest {
-
- public void befores() throws JMSException, InterruptedException {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
- Connection connection = connectionFactory.createConnection();
-
- connection.start();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("test-queue");
-
- MessageProducer producer = session.createProducer(destination);
-
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- for (int i = 0; i < 20; i++) {
- TextMessage message = session.createTextMessage("hello 我是消息:" + i);
- producer.send(message);
- }
-
- session.commit();
- session.close();
- connection.close();
- }
-
- //@Test
- public void test() throws InterruptedException {
- KeyValue kv = new DefaultKeyValue();
- kv.put("activemqUrl", "tcp://112.74.48.251:6166");
- kv.put("destinationType", "queue");
- kv.put("destinationName", "test-queue");
- ActivemqSourceTask task = new ActivemqSourceTask();
- task.start(kv);
- for (int i = 0; i < 20; ) {
- Collection<SourceDataEntry> sourceDataEntry = task.poll();
- i = i + sourceDataEntry.size();
- System.out.println(sourceDataEntry);
- }
- Thread.sleep(20000);
- }
-
- @Test
- public void pollTest() throws Exception {
- ActivemqSourceTask task = new ActivemqSourceTask();
- TextMessage textMessage = new ActiveMQTextMessage();
- textMessage.setText("hello rocketmq");
-
- Replicator replicatorObject = Mockito.mock(Replicator.class);
- BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
- Mockito.when(replicatorObject.getQueue()).thenReturn(queue);
-
- Field replicator = ActivemqSourceTask.class.getDeclaredField("replicator");
- replicator.setAccessible(true);
- replicator.set(task, replicatorObject);
-
- Field config = ActivemqSourceTask.class.getDeclaredField("config");
- config.setAccessible(true);
- config.set(task, new Config());
-
- queue.put(textMessage);
- Collection<SourceDataEntry> list = task.poll();
- Assert.assertEquals(list.size(), 1);
-
- list = task.poll();
- Assert.assertEquals(list.size(), 0);
-
- }
-
- @Test(expected = RuntimeException.class)
- public void getMessageConnentTest() throws JMSException {
- String value = "hello rocketmq";
- ActivemqSourceTask task = new ActivemqSourceTask();
- TextMessage textMessage = new ActiveMQTextMessage();
- textMessage.setText(value);
- ByteBuffer buffer = task.getMessageConnent(textMessage);
- Assert.assertEquals(new String(buffer.array()), textMessage.getText());
-
- ObjectMessage objectMessage = new ActiveMQObjectMessage();
- objectMessage.setObject(value);
- buffer = task.getMessageConnent(objectMessage);
- Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\"");
-
- BytesMessage bytes = new ActiveMQBytesMessage();
- bytes.writeBytes(value.getBytes());
- bytes.reset();
- buffer = task.getMessageConnent(bytes);
- Assert.assertEquals(new String(buffer.array()), value);
-
- MapMessage mapMessage = new ActiveMQMapMessage();
- mapMessage.setString("hello", "rocketmq");
- buffer = task.getMessageConnent(mapMessage);
- Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
- Assert.assertEquals(map.get("hello"), "rocketmq");
- Assert.assertEquals(map.size(), 1);
-
- StreamMessage streamMessage = new ActiveMQStreamMessage();
- String valueTwo = null;
- for (int i = 0; i < 200; i++) {
- valueTwo = valueTwo + value;
- }
- streamMessage.writeBytes(valueTwo.getBytes());
- streamMessage.reset();
- buffer = task.getMessageConnent(streamMessage);
- Assert.assertEquals(new String(buffer.array()), valueTwo);
-
- task.getMessageConnent(null);
- }
-}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
index 6c4029a..632c9d3 100644
--- a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
@@ -48,7 +48,7 @@ public class BaseJmsSourceConnectorTest {
}
@Override
- Set<String> getRequiredConfig() {
+ public Set<String> getRequiredConfig() {
return REQUEST_CONFIG;
}
};