You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:56 UTC

[rocketmq-connect] 03/06: [ISSUE #312] Implement rocketmq connect RabbitMQ (#313)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 6801219d34b33501c70a2e76e6147c07edfd2b41
Author: githublaohu <23...@qq.com>
AuthorDate: Sun Jul 21 21:17:48 2019 +0800

    [ISSUE #312] Implement rocketmq connect RabbitMQ (#313)
    
    * complete RabbitMQ  connector
    
    * delete class file
---
 README-CN.md                                       |  16 --
 README.md                                          |  16 --
 pom.xml                                            |   5 -
 rocketmq-connect-jms.iml                           |  21 +++
 .../org/apache/rocketmq/connect/jms/Config.java    |   4 +-
 .../jms/connector/BaseJmsSourceConnector.java      |   4 +-
 .../connect/jms/connector/BaseJmsSourceTask.java   |   4 +-
 .../connect/jms/pattern/PatternProcessor.java      |   4 +-
 .../rocketmq/connect/jms/ReplicatorTest.java       |  74 ----------
 .../jms/connector/ActivemqSourceTaskTest.java      | 164 ---------------------
 .../jms/connector/BaseJmsSourceConnectorTest.java  |   2 +-
 11 files changed, 29 insertions(+), 285 deletions(-)

diff --git a/README-CN.md b/README-CN.md
deleted file mode 100644
index be03683..0000000
--- a/README-CN.md
+++ /dev/null
@@ -1,16 +0,0 @@
-##### ActiveConnector完全限定名
-org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
-
-
-##### 配置参数
-
-参数 | 作用 | 是否必填 | 默认值
----|--- |--- | ---
-activemq.url | activemq ip与端口号 | 是 | 无
-activemq.username | 用户名 | 否 |  无
-activemq.password|  密码    | 否  | 无
-jms.destination.name | 读取的队列或者主题名   |  是 | 无
-jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
-jms.message.selector | 过滤器    |  否  |无
-jms.session.acknowledge.mode | 消息确认  | 否 | Session.AUTO_ACKNOWLEDGE
-jms.session.transacted | 是否是事务会话      | 否 | false
diff --git a/README.md b/README.md
deleted file mode 100644
index e15149e..0000000
--- a/README.md
+++ /dev/null
@@ -1,16 +0,0 @@
-##### ActiveConnector fully-qualified name
-org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
-
-
-##### parameter configuration
-
-parameter | effect | required |default
----|--- |--- | ---
-activemq.url | The URL of the ActiveMQ broker | yes | null
-activemq.username | The username to use when connecting to ActiveMQ | no |  null
-activemq.password|  The password to use when connecting to ActiveMQ    | no  | null
-jms.destination.name | The name of the JMS destination (queue or topic) to read from   |  yes | null
-jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null
-jms.message.selector | The message selector that should be applied to messages in the destination    |  no  | null 
-jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session  | null | Session.AUTO_ACKNOWLEDGE
-jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session      | null | false
diff --git a/pom.xml b/pom.xml
index c7c3ba2..50e0490 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,11 +186,6 @@
             <version>1.2</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-all</artifactId>
-            <version>5.9.0</version>
-        </dependency>
-        <dependency>
             <groupId>javax.jms</groupId>
             <artifactId>javax.jms-api</artifactId>
             <version>2.0</version>
diff --git a/rocketmq-connect-jms.iml b/rocketmq-connect-jms.iml
new file mode 100644
index 0000000..b187d53
--- /dev/null
+++ b/rocketmq-connect-jms.iml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4">
+  <component name="EclipseModuleManager">
+    <conelement value="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" />
+    <src_description expected_position="0">
+      <src_folder value="file://$MODULE_DIR$/src/main/java" expected_position="0" />
+      <src_folder value="file://$MODULE_DIR$/src/test/java" expected_position="1" />
+    </src_description>
+  </component>
+  <component name="NewModuleRootManager">
+    <output url="file://$MODULE_DIR$/target/classes" />
+    <exclude-output />
+    <content url="file://$MODULE_DIR$">
+      <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="false" />
+    </content>
+    <orderEntry type="sourceFolder" forTests="false" />
+    <orderEntry type="jdk" jdkName="JavaSE-1.8" jdkType="JavaSDK" />
+    <orderEntry type="library" name="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" level="application" />
+  </component>
+</module>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Config.java b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
index e9cd178..1e69f89 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
@@ -94,9 +94,7 @@ public class Config {
                 }
             }
         }
-    }
-
-    
+    }    
     
     public String getBrokerUrl() {
 		return brokerUrl;
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
index f939881..68f7677 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.connect.jms.connector;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
@@ -31,7 +30,6 @@ public abstract class BaseJmsSourceConnector extends SourceConnector {
 
     @Override
     public String verifyAndSetConfig(KeyValue config) {
-    	
         for (String requestKey : getRequiredConfig()) {
             if (!config.containsKey(requestKey)) {
                 return "Request config key: " + requestKey;
@@ -69,5 +67,5 @@ public abstract class BaseJmsSourceConnector extends SourceConnector {
         return config;
     }
     
-    abstract Set<String> getRequiredConfig();
+    public abstract Set<String> getRequiredConfig();
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
index f43e7fb..7b1f7c4 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
@@ -82,7 +82,6 @@ public abstract class BaseJmsSourceTask extends SourceTask {
             this.config = new Config();
             this.config.load(props);
             this.sourcePartition = ByteBuffer.wrap(config.getBrokerUrl().getBytes("UTF-8"));
-            this.replicator = new Replicator(config,this);
             this.replicator.start();
         } catch (Exception e) {
             log.error("activemq task start failed.", e);
@@ -143,6 +142,9 @@ public abstract class BaseJmsSourceTask extends SourceTask {
         }
         return ByteBuffer.wrap(data);
     }
+
+
+    public abstract Config getConfig();
     
     public abstract PatternProcessor getPatternProcessor(Replicator replicator);
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
index e03691f..5399045 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
@@ -33,7 +33,7 @@ public abstract class PatternProcessor {
 
     private Replicator replicator;
 
-    Config config;
+    protected Config config;
 
     private Connection connection;
 
@@ -47,7 +47,7 @@ public abstract class PatternProcessor {
     }
 
     public abstract ConnectionFactory connectionFactory();
-    
+
     public void start() throws Exception {
         if (!StringUtils.equals("topic", config.getDestinationType())
             && !StringUtils.equals("queue", config.getDestinationType())) {
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
deleted file mode 100644
index 32b8f04..0000000
--- a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.jms;
-
-import java.lang.reflect.Field;
-
-import javax.jms.Message;
-
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import org.junit.Assert;
-
-public class ReplicatorTest {
-
-    Replicator replicator;
-
-    PatternProcessor patternProcessor;
-
-    Config config;
-
-    @Before
-    public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
-        config = new Config();
-        replicator = new Replicator(config,null);
-
-        patternProcessor = Mockito.mock(PatternProcessor.class);
-
-        Field processor = Replicator.class.getDeclaredField("processor");
-        processor.setAccessible(true);
-        processor.set(replicator, patternProcessor);
-    }
-
-    @Test(expected = RuntimeException.class)
-    public void startTest() throws Exception {
-        replicator.start();
-    }
-
-    @Test
-    public void stop() throws Exception {
-        replicator.stop();
-        Mockito.verify(patternProcessor, Mockito.times(1)).stop();
-    }
-
-    @Test
-    public void commitAddGetQueueTest() {
-        Message message = new ActiveMQTextMessage();
-        replicator.commit(message, false);
-        Assert.assertEquals(replicator.getQueue().poll(), message);
-    }
-
-    @Test
-    public void getConfigTest() {
-        Assert.assertEquals(replicator.getConfig(), config);
-    }
-}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
deleted file mode 100644
index 72e818a..0000000
--- a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.jms.connector;
-
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQStreamMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.rocketmq.connect.jms.Config;
-import org.apache.rocketmq.connect.jms.Replicator;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import com.alibaba.fastjson.JSON;
-
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.internal.DefaultKeyValue;
-
-public class ActivemqSourceTaskTest {
-
-    public void befores() throws JMSException, InterruptedException {
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
-        Connection connection = connectionFactory.createConnection();
-
-        connection.start();
-        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createQueue("test-queue");
-
-        MessageProducer producer = session.createProducer(destination);
-
-        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        for (int i = 0; i < 20; i++) {
-            TextMessage message = session.createTextMessage("hello 我是消息:" + i);
-            producer.send(message);
-        }
-
-        session.commit();
-        session.close();
-        connection.close();
-    }
-
-    //@Test
-    public void test() throws InterruptedException {
-        KeyValue kv = new DefaultKeyValue();
-        kv.put("activemqUrl", "tcp://112.74.48.251:6166");
-        kv.put("destinationType", "queue");
-        kv.put("destinationName", "test-queue");
-        ActivemqSourceTask task = new ActivemqSourceTask();
-        task.start(kv);
-        for (int i = 0; i < 20; ) {
-            Collection<SourceDataEntry> sourceDataEntry = task.poll();
-            i = i + sourceDataEntry.size();
-            System.out.println(sourceDataEntry);
-        }
-        Thread.sleep(20000);
-    }
-
-    @Test
-    public void pollTest() throws Exception {
-        ActivemqSourceTask task = new ActivemqSourceTask();
-        TextMessage textMessage = new ActiveMQTextMessage();
-        textMessage.setText("hello rocketmq");
-
-        Replicator replicatorObject = Mockito.mock(Replicator.class);
-        BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
-        Mockito.when(replicatorObject.getQueue()).thenReturn(queue);
-
-        Field replicator = ActivemqSourceTask.class.getDeclaredField("replicator");
-        replicator.setAccessible(true);
-        replicator.set(task, replicatorObject);
-
-        Field config = ActivemqSourceTask.class.getDeclaredField("config");
-        config.setAccessible(true);
-        config.set(task, new Config());
-
-        queue.put(textMessage);
-        Collection<SourceDataEntry> list = task.poll();
-        Assert.assertEquals(list.size(), 1);
-
-        list = task.poll();
-        Assert.assertEquals(list.size(), 0);
-
-    }
-
-    @Test(expected = RuntimeException.class)
-    public void getMessageConnentTest() throws JMSException {
-        String value = "hello rocketmq";
-        ActivemqSourceTask task = new ActivemqSourceTask();
-        TextMessage textMessage = new ActiveMQTextMessage();
-        textMessage.setText(value);
-        ByteBuffer buffer = task.getMessageConnent(textMessage);
-        Assert.assertEquals(new String(buffer.array()), textMessage.getText());
-
-        ObjectMessage objectMessage = new ActiveMQObjectMessage();
-        objectMessage.setObject(value);
-        buffer = task.getMessageConnent(objectMessage);
-        Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\"");
-
-        BytesMessage bytes = new ActiveMQBytesMessage();
-        bytes.writeBytes(value.getBytes());
-        bytes.reset();
-        buffer = task.getMessageConnent(bytes);
-        Assert.assertEquals(new String(buffer.array()), value);
-
-        MapMessage mapMessage = new ActiveMQMapMessage();
-        mapMessage.setString("hello", "rocketmq");
-        buffer = task.getMessageConnent(mapMessage);
-        Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
-        Assert.assertEquals(map.get("hello"), "rocketmq");
-        Assert.assertEquals(map.size(), 1);
-
-        StreamMessage streamMessage = new ActiveMQStreamMessage();
-        String valueTwo = null;
-        for (int i = 0; i < 200; i++) {
-            valueTwo = valueTwo + value;
-        }
-        streamMessage.writeBytes(valueTwo.getBytes());
-        streamMessage.reset();
-        buffer = task.getMessageConnent(streamMessage);
-        Assert.assertEquals(new String(buffer.array()), valueTwo);
-
-        task.getMessageConnent(null);
-    }
-}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
index 6c4029a..632c9d3 100644
--- a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
@@ -48,7 +48,7 @@ public class BaseJmsSourceConnectorTest {
 		}
 		
 		@Override
-		Set<String> getRequiredConfig() {
+		public Set<String> getRequiredConfig() {
 			return REQUEST_CONFIG;
 		}
 	};