You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:53 UTC

[rocketmq-connect] branch master updated (24c083e -> b3cbb0b)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git.


    from 24c083e  Add 'connector/rocketmq-connect-jdbc/' from commit '6708ada617d9f6cfef5ca42a3c2f97af44603a89'
     new 8b8c95f  Initial rocketmq-connect-jms project
     new 78cc902  [ISSUE #302] Implement rocketmq connect jms (#303)
     new 6801219  [ISSUE #312] Implement rocketmq connect RabbitMQ (#313)
     new 4e3ccda  fix(connect-runtime) remove unused local variable
     new 37a4db7  fix(connect-jms) should put .iml file in gitignore
     new b3cbb0b  Add 'connector/rocketmq-connect-jms/' from commit '37a4db762dfac7496e2e0e40cf7cf25c4bd0f01d'

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pom.xml                                        | 11 ++---
 .../org/apache/rocketmq/connect/jms}/Config.java   | 52 +++++++++++-----------
 .../apache/rocketmq/connect/jms}/ErrorCode.java    |  2 +-
 .../apache/rocketmq/connect/jms}/Replicator.java   | 12 +++--
 .../jms/connector/BaseJmsSourceConnector.java}     | 19 ++++----
 .../connect/jms/connector/BaseJmsSourceTask.java}  | 37 +++++++++------
 .../connect/jms}/pattern/PatternProcessor.java     | 27 +++++------
 .../jms/connector/BaseJmsSourceConnectorTest.java} | 29 ++++++------
 8 files changed, 100 insertions(+), 89 deletions(-)
 copy connector/{rocketmq-connect-activemq => rocketmq-connect-jms}/pom.xml (95%)
 copy connector/{rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq => rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms}/Config.java (84%)
 copy connector/{rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq => rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms}/ErrorCode.java (74%)
 copy connector/{rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq => rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms}/Replicator.java (80%)
 copy connector/{rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceConnector.java => rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java} (83%)
 copy connector/{rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java => rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java} (88%)
 copy connector/{rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq => rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms}/pattern/PatternProcessor.java (80%)
 copy connector/{rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java => rocketmq-connect-jms/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java} (80%)

[rocketmq-connect] 03/06: [ISSUE #312] Implement rocketmq connect RabbitMQ (#313)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 6801219d34b33501c70a2e76e6147c07edfd2b41
Author: githublaohu <23...@qq.com>
AuthorDate: Sun Jul 21 21:17:48 2019 +0800

    [ISSUE #312] Implement rocketmq connect RabbitMQ (#313)
    
    * complete RabbitMQ  connector
    
    * delete class file
---
 README-CN.md                                       |  16 --
 README.md                                          |  16 --
 pom.xml                                            |   5 -
 rocketmq-connect-jms.iml                           |  21 +++
 .../org/apache/rocketmq/connect/jms/Config.java    |   4 +-
 .../jms/connector/BaseJmsSourceConnector.java      |   4 +-
 .../connect/jms/connector/BaseJmsSourceTask.java   |   4 +-
 .../connect/jms/pattern/PatternProcessor.java      |   4 +-
 .../rocketmq/connect/jms/ReplicatorTest.java       |  74 ----------
 .../jms/connector/ActivemqSourceTaskTest.java      | 164 ---------------------
 .../jms/connector/BaseJmsSourceConnectorTest.java  |   2 +-
 11 files changed, 29 insertions(+), 285 deletions(-)

diff --git a/README-CN.md b/README-CN.md
deleted file mode 100644
index be03683..0000000
--- a/README-CN.md
+++ /dev/null
@@ -1,16 +0,0 @@
-##### ActiveConnector完全限定名
-org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
-
-
-##### 配置参数
-
-参数 | 作用 | 是否必填 | 默认值
----|--- |--- | ---
-activemq.url | activemq ip与端口号 | 是 | 无
-activemq.username | 用户名 | 否 |  无
-activemq.password|  密码    | 否  | 无
-jms.destination.name | 读取的队列或者主题名   |  是 | 无
-jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
-jms.message.selector | 过滤器    |  否  |无
-jms.session.acknowledge.mode | 消息确认  | 否 | Session.AUTO_ACKNOWLEDGE
-jms.session.transacted | 是否是事务会话      | 否 | false
diff --git a/README.md b/README.md
deleted file mode 100644
index e15149e..0000000
--- a/README.md
+++ /dev/null
@@ -1,16 +0,0 @@
-##### ActiveConnector fully-qualified name
-org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
-
-
-##### parameter configuration
-
-parameter | effect | required |default
----|--- |--- | ---
-activemq.url | The URL of the ActiveMQ broker | yes | null
-activemq.username | The username to use when connecting to ActiveMQ | no |  null
-activemq.password|  The password to use when connecting to ActiveMQ    | no  | null
-jms.destination.name | The name of the JMS destination (queue or topic) to read from   |  yes | null
-jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null
-jms.message.selector | The message selector that should be applied to messages in the destination    |  no  | null 
-jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session  | null | Session.AUTO_ACKNOWLEDGE
-jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session      | null | false
diff --git a/pom.xml b/pom.xml
index c7c3ba2..50e0490 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,11 +186,6 @@
             <version>1.2</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-all</artifactId>
-            <version>5.9.0</version>
-        </dependency>
-        <dependency>
             <groupId>javax.jms</groupId>
             <artifactId>javax.jms-api</artifactId>
             <version>2.0</version>
diff --git a/rocketmq-connect-jms.iml b/rocketmq-connect-jms.iml
new file mode 100644
index 0000000..b187d53
--- /dev/null
+++ b/rocketmq-connect-jms.iml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4">
+  <component name="EclipseModuleManager">
+    <conelement value="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" />
+    <src_description expected_position="0">
+      <src_folder value="file://$MODULE_DIR$/src/main/java" expected_position="0" />
+      <src_folder value="file://$MODULE_DIR$/src/test/java" expected_position="1" />
+    </src_description>
+  </component>
+  <component name="NewModuleRootManager">
+    <output url="file://$MODULE_DIR$/target/classes" />
+    <exclude-output />
+    <content url="file://$MODULE_DIR$">
+      <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="false" />
+    </content>
+    <orderEntry type="sourceFolder" forTests="false" />
+    <orderEntry type="jdk" jdkName="JavaSE-1.8" jdkType="JavaSDK" />
+    <orderEntry type="library" name="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" level="application" />
+  </component>
+</module>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Config.java b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
index e9cd178..1e69f89 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
@@ -94,9 +94,7 @@ public class Config {
                 }
             }
         }
-    }
-
-    
+    }    
     
     public String getBrokerUrl() {
 		return brokerUrl;
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
index f939881..68f7677 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.connect.jms.connector;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
@@ -31,7 +30,6 @@ public abstract class BaseJmsSourceConnector extends SourceConnector {
 
     @Override
     public String verifyAndSetConfig(KeyValue config) {
-    	
         for (String requestKey : getRequiredConfig()) {
             if (!config.containsKey(requestKey)) {
                 return "Request config key: " + requestKey;
@@ -69,5 +67,5 @@ public abstract class BaseJmsSourceConnector extends SourceConnector {
         return config;
     }
     
-    abstract Set<String> getRequiredConfig();
+    public abstract Set<String> getRequiredConfig();
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
index f43e7fb..7b1f7c4 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
@@ -82,7 +82,6 @@ public abstract class BaseJmsSourceTask extends SourceTask {
             this.config = new Config();
             this.config.load(props);
             this.sourcePartition = ByteBuffer.wrap(config.getBrokerUrl().getBytes("UTF-8"));
-            this.replicator = new Replicator(config,this);
             this.replicator.start();
         } catch (Exception e) {
             log.error("activemq task start failed.", e);
@@ -143,6 +142,9 @@ public abstract class BaseJmsSourceTask extends SourceTask {
         }
         return ByteBuffer.wrap(data);
     }
+
+
+    public abstract Config getConfig();
     
     public abstract PatternProcessor getPatternProcessor(Replicator replicator);
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
index e03691f..5399045 100644
--- a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
@@ -33,7 +33,7 @@ public abstract class PatternProcessor {
 
     private Replicator replicator;
 
-    Config config;
+    protected Config config;
 
     private Connection connection;
 
@@ -47,7 +47,7 @@ public abstract class PatternProcessor {
     }
 
     public abstract ConnectionFactory connectionFactory();
-    
+
     public void start() throws Exception {
         if (!StringUtils.equals("topic", config.getDestinationType())
             && !StringUtils.equals("queue", config.getDestinationType())) {
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
deleted file mode 100644
index 32b8f04..0000000
--- a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.jms;
-
-import java.lang.reflect.Field;
-
-import javax.jms.Message;
-
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import org.junit.Assert;
-
-public class ReplicatorTest {
-
-    Replicator replicator;
-
-    PatternProcessor patternProcessor;
-
-    Config config;
-
-    @Before
-    public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
-        config = new Config();
-        replicator = new Replicator(config,null);
-
-        patternProcessor = Mockito.mock(PatternProcessor.class);
-
-        Field processor = Replicator.class.getDeclaredField("processor");
-        processor.setAccessible(true);
-        processor.set(replicator, patternProcessor);
-    }
-
-    @Test(expected = RuntimeException.class)
-    public void startTest() throws Exception {
-        replicator.start();
-    }
-
-    @Test
-    public void stop() throws Exception {
-        replicator.stop();
-        Mockito.verify(patternProcessor, Mockito.times(1)).stop();
-    }
-
-    @Test
-    public void commitAddGetQueueTest() {
-        Message message = new ActiveMQTextMessage();
-        replicator.commit(message, false);
-        Assert.assertEquals(replicator.getQueue().poll(), message);
-    }
-
-    @Test
-    public void getConfigTest() {
-        Assert.assertEquals(replicator.getConfig(), config);
-    }
-}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
deleted file mode 100644
index 72e818a..0000000
--- a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.jms.connector;
-
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQStreamMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.rocketmq.connect.jms.Config;
-import org.apache.rocketmq.connect.jms.Replicator;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import com.alibaba.fastjson.JSON;
-
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.internal.DefaultKeyValue;
-
-public class ActivemqSourceTaskTest {
-
-    public void befores() throws JMSException, InterruptedException {
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
-        Connection connection = connectionFactory.createConnection();
-
-        connection.start();
-        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createQueue("test-queue");
-
-        MessageProducer producer = session.createProducer(destination);
-
-        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        for (int i = 0; i < 20; i++) {
-            TextMessage message = session.createTextMessage("hello 我是消息:" + i);
-            producer.send(message);
-        }
-
-        session.commit();
-        session.close();
-        connection.close();
-    }
-
-    //@Test
-    public void test() throws InterruptedException {
-        KeyValue kv = new DefaultKeyValue();
-        kv.put("activemqUrl", "tcp://112.74.48.251:6166");
-        kv.put("destinationType", "queue");
-        kv.put("destinationName", "test-queue");
-        ActivemqSourceTask task = new ActivemqSourceTask();
-        task.start(kv);
-        for (int i = 0; i < 20; ) {
-            Collection<SourceDataEntry> sourceDataEntry = task.poll();
-            i = i + sourceDataEntry.size();
-            System.out.println(sourceDataEntry);
-        }
-        Thread.sleep(20000);
-    }
-
-    @Test
-    public void pollTest() throws Exception {
-        ActivemqSourceTask task = new ActivemqSourceTask();
-        TextMessage textMessage = new ActiveMQTextMessage();
-        textMessage.setText("hello rocketmq");
-
-        Replicator replicatorObject = Mockito.mock(Replicator.class);
-        BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
-        Mockito.when(replicatorObject.getQueue()).thenReturn(queue);
-
-        Field replicator = ActivemqSourceTask.class.getDeclaredField("replicator");
-        replicator.setAccessible(true);
-        replicator.set(task, replicatorObject);
-
-        Field config = ActivemqSourceTask.class.getDeclaredField("config");
-        config.setAccessible(true);
-        config.set(task, new Config());
-
-        queue.put(textMessage);
-        Collection<SourceDataEntry> list = task.poll();
-        Assert.assertEquals(list.size(), 1);
-
-        list = task.poll();
-        Assert.assertEquals(list.size(), 0);
-
-    }
-
-    @Test(expected = RuntimeException.class)
-    public void getMessageConnentTest() throws JMSException {
-        String value = "hello rocketmq";
-        ActivemqSourceTask task = new ActivemqSourceTask();
-        TextMessage textMessage = new ActiveMQTextMessage();
-        textMessage.setText(value);
-        ByteBuffer buffer = task.getMessageConnent(textMessage);
-        Assert.assertEquals(new String(buffer.array()), textMessage.getText());
-
-        ObjectMessage objectMessage = new ActiveMQObjectMessage();
-        objectMessage.setObject(value);
-        buffer = task.getMessageConnent(objectMessage);
-        Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\"");
-
-        BytesMessage bytes = new ActiveMQBytesMessage();
-        bytes.writeBytes(value.getBytes());
-        bytes.reset();
-        buffer = task.getMessageConnent(bytes);
-        Assert.assertEquals(new String(buffer.array()), value);
-
-        MapMessage mapMessage = new ActiveMQMapMessage();
-        mapMessage.setString("hello", "rocketmq");
-        buffer = task.getMessageConnent(mapMessage);
-        Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
-        Assert.assertEquals(map.get("hello"), "rocketmq");
-        Assert.assertEquals(map.size(), 1);
-
-        StreamMessage streamMessage = new ActiveMQStreamMessage();
-        String valueTwo = null;
-        for (int i = 0; i < 200; i++) {
-            valueTwo = valueTwo + value;
-        }
-        streamMessage.writeBytes(valueTwo.getBytes());
-        streamMessage.reset();
-        buffer = task.getMessageConnent(streamMessage);
-        Assert.assertEquals(new String(buffer.array()), valueTwo);
-
-        task.getMessageConnent(null);
-    }
-}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
index 6c4029a..632c9d3 100644
--- a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
@@ -48,7 +48,7 @@ public class BaseJmsSourceConnectorTest {
 		}
 		
 		@Override
-		Set<String> getRequiredConfig() {
+		public Set<String> getRequiredConfig() {
 			return REQUEST_CONFIG;
 		}
 	};

[rocketmq-connect] 02/06: [ISSUE #302] Implement rocketmq connect jms (#303)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 78cc90208215b2f9a9998149be968a7b76575d36
Author: githublaohu <23...@qq.com>
AuthorDate: Fri Jun 21 15:38:21 2019 +0800

    [ISSUE #302] Implement rocketmq connect jms (#303)
---
 README-CN.md                                       |  16 ++
 README.md                                          |  15 ++
 pom.xml                                            | 201 +++++++++++++++++++++
 .../org/apache/rocketmq/connect/jms/Config.java    | 165 +++++++++++++++++
 .../org/apache/rocketmq/connect/jms/ErrorCode.java |   8 +
 .../apache/rocketmq/connect/jms/Replicator.java    |  67 +++++++
 .../jms/connector/BaseJmsSourceConnector.java      |  73 ++++++++
 .../connect/jms/connector/BaseJmsSourceTask.java   | 148 +++++++++++++++
 .../connect/jms/pattern/PatternProcessor.java      |  90 +++++++++
 .../rocketmq/connect/jms/ReplicatorTest.java       |  74 ++++++++
 .../jms/connector/ActivemqSourceTaskTest.java      | 164 +++++++++++++++++
 .../jms/connector/BaseJmsSourceConnectorTest.java  |  82 +++++++++
 12 files changed, 1103 insertions(+)

diff --git a/README-CN.md b/README-CN.md
new file mode 100644
index 0000000..be03683
--- /dev/null
+++ b/README-CN.md
@@ -0,0 +1,16 @@
+##### ActiveConnector完全限定名
+org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
+
+
+##### 配置参数
+
+参数 | 作用 | 是否必填 | 默认值
+---|--- |--- | ---
+activemq.url | activemq ip与端口号 | 是 | 无
+activemq.username | 用户名 | 否 |  无
+activemq.password|  密码    | 否  | 无
+jms.destination.name | 读取的队列或者主题名   |  是 | 无
+jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
+jms.message.selector | 过滤器    |  否  |无
+jms.session.acknowledge.mode | 消息确认  | 否 | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | 是否是事务会话      | 否 | false
diff --git a/README.md b/README.md
index 8b13789..e15149e 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,16 @@
+##### ActiveConnector fully-qualified name
+org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
 
+
+##### parameter configuration
+
+parameter | effect | required |default
+---|--- |--- | ---
+activemq.url | The URL of the ActiveMQ broker | yes | null
+activemq.username | The username to use when connecting to ActiveMQ | no |  null
+activemq.password|  The password to use when connecting to ActiveMQ    | no  | null
+jms.destination.name | The name of the JMS destination (queue or topic) to read from   |  yes | null
+jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null
+jms.message.selector | The message selector that should be applied to messages in the destination    |  no  | null 
+jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session  | null | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session      | null | false
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c7c3ba2
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-jms</artifactId>
+    <version>1.0.0</version>
+
+    <name>connect-jms</name>
+    <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jms</url>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.0-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.51</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-all</artifactId>
+            <version>5.9.0</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.jms</groupId>
+            <artifactId>javax.jms-api</artifactId>
+            <version>2.0</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Config.java b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
new file mode 100644
index 0000000..e9cd178
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/Config.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+import javax.jms.Session;
+
+public class Config {
+
+    @SuppressWarnings("serial")
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("brokerUrl");
+            add("destinationType");
+            add("destinationName");
+        }
+    };
+
+    public String brokerUrl;
+
+    public String username;
+
+    public String password;
+
+    public String destinationType;
+
+    public String destinationName;
+
+    public String messageSelector;
+
+    private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
+    private Boolean sessionTransacted = Boolean.FALSE;
+
+    public void load(KeyValue props) {
+
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    
+    
+    public String getBrokerUrl() {
+		return brokerUrl;
+	}
+
+	public void setBrokerUrl(String brokerUrl) {
+		this.brokerUrl = brokerUrl;
+	}
+
+	public String getUsername() {
+		return username;
+	}
+
+	public void setUsername(String username) {
+		this.username = username;
+	}
+
+	public String getPassword() {
+		return password;
+	}
+
+	public void setPassword(String password) {
+		this.password = password;
+	}
+
+	public String getDestinationType() {
+        return destinationType;
+    }
+
+    public void setDestinationType(String destinationType) {
+        this.destinationType = destinationType;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public void setDestinationName(String destinationName) {
+        this.destinationName = destinationName;
+    }
+
+    public String getMessageSelector() {
+        return messageSelector;
+    }
+
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    public Integer getSessionAcknowledgeMode() {
+        return sessionAcknowledgeMode;
+    }
+
+    public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
+        this.sessionAcknowledgeMode = sessionAcknowledgeMode;
+    }
+
+    public Boolean getSessionTransacted() {
+        return sessionTransacted;
+    }
+
+    public void setSessionTransacted(Boolean sessionTransacted) {
+        this.sessionTransacted = sessionTransacted;
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java b/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java
new file mode 100644
index 0000000..b6dafb1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java
@@ -0,0 +1,8 @@
+package org.apache.rocketmq.connect.jms;
+
+public class ErrorCode {
+
+    public static final int START_ERROR_CODE = 10001;
+
+    public static final int STOP_ERROR_CODE = 10002;
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java b/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java
new file mode 100644
index 0000000..3e859a4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.Message;
+
+import org.apache.rocketmq.connect.jms.connector.BaseJmsSourceTask;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
+
+    private PatternProcessor processor;
+
+    private Config config;
+    private BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+    
+    private BaseJmsSourceTask baseJmsSourceTask;
+
+    public Replicator(Config config , BaseJmsSourceTask baseJmsSourceTask) {
+        this.config = config;
+        this.baseJmsSourceTask = baseJmsSourceTask;
+    }
+
+    public void start() throws Exception {
+        processor = baseJmsSourceTask.getPatternProcessor(this);
+        processor.start();
+        LOGGER.info("Replicator start succeed");
+    }
+
+    public void stop() throws Exception {
+        processor.stop();
+    }
+
+    public void commit(Message message, boolean isComplete) {
+        queue.add(message);
+    }
+
+    public Config getConfig() {
+        return this.config;
+    }
+
+    public BlockingQueue<Message> getQueue() {
+        return queue;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
new file mode 100644
index 0000000..f939881
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+
+public abstract class BaseJmsSourceConnector extends SourceConnector {
+
+    private KeyValue config;
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+    	
+        for (String requestKey : getRequiredConfig()) {
+            if (!config.containsKey(requestKey)) {
+                return "Request config key: " + requestKey;
+            }
+        }
+        this.config = config;
+        return "";
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+
+    @Override
+    public abstract Class<? extends Task> taskClass();
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        List<KeyValue> config = new ArrayList<>();
+        config.add(this.config);
+        return config;
+    }
+    
+    abstract Set<String> getRequiredConfig();
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
new file mode 100644
index 0000000..f43e7fb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.ErrorCode;
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.exception.DataConnectException;
+import io.openmessaging.connector.api.source.SourceTask;
+
+public abstract class BaseJmsSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(BaseJmsSourceTask.class);
+
+    private Replicator replicator;
+
+    private Config config;
+
+    private ByteBuffer sourcePartition;
+
+    @Override
+    public Collection<SourceDataEntry> poll() {
+        List<SourceDataEntry> res = new ArrayList<>();
+        try {
+            Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+            if (message != null) {
+                Object[] payload = new Object[] {config.getDestinationType(), config.getDestinationName(), getMessageContent(message)};
+                SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, null, System.currentTimeMillis(), EntryType.CREATE, null, null, payload);
+                res.add(sourceDataEntry);
+            }
+        } catch (Exception e) {
+            log.error("activemq task poll error, current config:" + JSON.toJSONString(config), e);
+        }
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue props) {
+        try {
+            this.config = new Config();
+            this.config.load(props);
+            this.sourcePartition = ByteBuffer.wrap(config.getBrokerUrl().getBytes("UTF-8"));
+            this.replicator = new Replicator(config,this);
+            this.replicator.start();
+        } catch (Exception e) {
+            log.error("activemq task start failed.", e);
+            throw new DataConnectException(ErrorCode.START_ERROR_CODE, e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            replicator.stop();
+        } catch (Exception e) {
+            log.error("activemq task stop failed.", e);
+            throw new DataConnectException(ErrorCode.STOP_ERROR_CODE, e.getMessage(), e);
+        }
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+
+    @SuppressWarnings("unchecked")
+    public ByteBuffer getMessageContent(Message message) throws JMSException {
+        byte[] data = null;
+        if (message instanceof TextMessage) {
+            data = ((TextMessage) message).getText().getBytes();
+        } else if (message instanceof ObjectMessage) {
+            data = JSON.toJSONBytes(((ObjectMessage) message).getObject());
+        } else if (message instanceof BytesMessage) {
+            BytesMessage bytesMessage = (BytesMessage) message;
+            data = new byte[(int) bytesMessage.getBodyLength()];
+            bytesMessage.readBytes(data);
+        } else if (message instanceof MapMessage) {
+            MapMessage mapMessage = (MapMessage) message;
+            Map<String, Object> map = new HashMap<>();
+            Enumeration<Object> names = mapMessage.getMapNames();
+            while (names.hasMoreElements()) {
+                String name = names.nextElement().toString();
+                map.put(name, mapMessage.getObject(name));
+            }
+            data = JSON.toJSONBytes(map);
+        } else if (message instanceof StreamMessage) {
+            StreamMessage streamMessage = (StreamMessage) message;
+            ByteArrayOutputStream bis = new ByteArrayOutputStream();
+            byte[] by = new byte[1024];
+            int i = 0;
+            while ((i = streamMessage.readBytes(by)) != -1) {
+                bis.write(by, 0, i);
+            }
+            data = bis.toByteArray();
+        } else {
+            // The exception is printed and does not need to be written as a DataConnectException
+            throw new RuntimeException("message type exception");
+        }
+        return ByteBuffer.wrap(data);
+    }
+    
+    public abstract PatternProcessor getPatternProcessor(Replicator replicator);
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
new file mode 100644
index 0000000..e03691f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.pattern;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.Replicator;
+
+public abstract class PatternProcessor {
+
+    private Replicator replicator;
+
+    Config config;
+
+    private Connection connection;
+
+    private Session session;
+
+    private MessageConsumer consumer;
+
+    public PatternProcessor(Replicator replicator) {
+        this.replicator = replicator;
+        this.config = replicator.getConfig();
+    }
+
+    public abstract ConnectionFactory connectionFactory();
+    
+    public void start() throws Exception {
+        if (!StringUtils.equals("topic", config.getDestinationType())
+            && !StringUtils.equals("queue", config.getDestinationType())) {
+            // RuntimeException is caught by DataConnectException
+            throw new RuntimeException("destination type is incorrectness");
+        }
+
+        ConnectionFactory connectionFactory = connectionFactory();
+
+        if (StringUtils.isNotBlank(config.getUsername())
+            && StringUtils.isNotBlank(config.getPassword())) {
+            connection = connectionFactory.createConnection(config.getUsername(), config.getPassword());
+        } else {
+            connection = connectionFactory.createConnection();
+        }
+        connection.start();
+        Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
+        Destination destination = null;
+        if (StringUtils.equals("topic", config.getDestinationType())) {
+            destination = session.createTopic(config.getDestinationName());
+        } else if (StringUtils.equals("queue", config.getDestinationType())) {
+            destination = session.createQueue(config.getDestinationName());
+        }
+        consumer = session.createConsumer(destination, config.getMessageSelector());
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                replicator.commit(message, true);
+            }
+        });
+
+    }
+
+    public void stop() throws Exception {
+        consumer.close();
+        session.close();
+        connection.close();
+    }
+
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
new file mode 100644
index 0000000..32b8f04
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms;
+
+import java.lang.reflect.Field;
+
+import javax.jms.Message;
+
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.junit.Assert;
+
+public class ReplicatorTest {
+
+    Replicator replicator;
+
+    PatternProcessor patternProcessor;
+
+    Config config;
+
+    @Before
+    public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
+        config = new Config();
+        replicator = new Replicator(config,null);
+
+        patternProcessor = Mockito.mock(PatternProcessor.class);
+
+        Field processor = Replicator.class.getDeclaredField("processor");
+        processor.setAccessible(true);
+        processor.set(replicator, patternProcessor);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void startTest() throws Exception {
+        replicator.start();
+    }
+
+    @Test
+    public void stop() throws Exception {
+        replicator.stop();
+        Mockito.verify(patternProcessor, Mockito.times(1)).stop();
+    }
+
+    @Test
+    public void commitAddGetQueueTest() {
+        Message message = new ActiveMQTextMessage();
+        replicator.commit(message, false);
+        Assert.assertEquals(replicator.getQueue().poll(), message);
+    }
+
+    @Test
+    public void getConfigTest() {
+        Assert.assertEquals(replicator.getConfig(), config);
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
new file mode 100644
index 0000000..72e818a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.rocketmq.connect.jms.Config;
+import org.apache.rocketmq.connect.jms.Replicator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.alibaba.fastjson.JSON;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class ActivemqSourceTaskTest {
+
+    public void befores() throws JMSException, InterruptedException {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
+        Connection connection = connectionFactory.createConnection();
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("test-queue");
+
+        MessageProducer producer = session.createProducer(destination);
+
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        for (int i = 0; i < 20; i++) {
+            TextMessage message = session.createTextMessage("hello 我是消息:" + i);
+            producer.send(message);
+        }
+
+        session.commit();
+        session.close();
+        connection.close();
+    }
+
+    //@Test
+    public void test() throws InterruptedException {
+        KeyValue kv = new DefaultKeyValue();
+        kv.put("activemqUrl", "tcp://112.74.48.251:6166");
+        kv.put("destinationType", "queue");
+        kv.put("destinationName", "test-queue");
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        task.start(kv);
+        for (int i = 0; i < 20; ) {
+            Collection<SourceDataEntry> sourceDataEntry = task.poll();
+            i = i + sourceDataEntry.size();
+            System.out.println(sourceDataEntry);
+        }
+        Thread.sleep(20000);
+    }
+
+    @Test
+    public void pollTest() throws Exception {
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        TextMessage textMessage = new ActiveMQTextMessage();
+        textMessage.setText("hello rocketmq");
+
+        Replicator replicatorObject = Mockito.mock(Replicator.class);
+        BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+        Mockito.when(replicatorObject.getQueue()).thenReturn(queue);
+
+        Field replicator = ActivemqSourceTask.class.getDeclaredField("replicator");
+        replicator.setAccessible(true);
+        replicator.set(task, replicatorObject);
+
+        Field config = ActivemqSourceTask.class.getDeclaredField("config");
+        config.setAccessible(true);
+        config.set(task, new Config());
+
+        queue.put(textMessage);
+        Collection<SourceDataEntry> list = task.poll();
+        Assert.assertEquals(list.size(), 1);
+
+        list = task.poll();
+        Assert.assertEquals(list.size(), 0);
+
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void getMessageConnentTest() throws JMSException {
+        String value = "hello rocketmq";
+        ActivemqSourceTask task = new ActivemqSourceTask();
+        TextMessage textMessage = new ActiveMQTextMessage();
+        textMessage.setText(value);
+        ByteBuffer buffer = task.getMessageConnent(textMessage);
+        Assert.assertEquals(new String(buffer.array()), textMessage.getText());
+
+        ObjectMessage objectMessage = new ActiveMQObjectMessage();
+        objectMessage.setObject(value);
+        buffer = task.getMessageConnent(objectMessage);
+        Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\"");
+
+        BytesMessage bytes = new ActiveMQBytesMessage();
+        bytes.writeBytes(value.getBytes());
+        bytes.reset();
+        buffer = task.getMessageConnent(bytes);
+        Assert.assertEquals(new String(buffer.array()), value);
+
+        MapMessage mapMessage = new ActiveMQMapMessage();
+        mapMessage.setString("hello", "rocketmq");
+        buffer = task.getMessageConnent(mapMessage);
+        Map<String, String> map = JSON.parseObject(buffer.array(), Map.class);
+        Assert.assertEquals(map.get("hello"), "rocketmq");
+        Assert.assertEquals(map.size(), 1);
+
+        StreamMessage streamMessage = new ActiveMQStreamMessage();
+        String valueTwo = null;
+        for (int i = 0; i < 200; i++) {
+            valueTwo = valueTwo + value;
+        }
+        streamMessage.writeBytes(valueTwo.getBytes());
+        streamMessage.reset();
+        buffer = task.getMessageConnent(streamMessage);
+        Assert.assertEquals(new String(buffer.array()), valueTwo);
+
+        task.getMessageConnent(null);
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
new file mode 100644
index 0000000..6c4029a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jms.connector;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.rocketmq.connect.jms.Config;
+import org.junit.Test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class BaseJmsSourceConnectorTest {
+
+	public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("activemqUrl");
+            add("destinationType");
+            add("destinationName");
+        }
+    };
+	
+	BaseJmsSourceConnector connector = new BaseJmsSourceConnector() {
+	    
+		
+		@Override
+		public Class<? extends Task> taskClass() {
+			return BaseJmsSourceTask.class;
+		}
+		
+		@Override
+		Set<String> getRequiredConfig() {
+			return REQUEST_CONFIG;
+		}
+	};
+
+    @Test
+    public void verifyAndSetConfigTest() {
+        KeyValue keyValue = new DefaultKeyValue();
+
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+            keyValue.put(requestKey, requestKey);
+        }
+        assertEquals(connector.verifyAndSetConfig(keyValue), "");
+    }
+
+    @Test
+    public void taskClassTest() {
+        assertEquals(connector.taskClass(), BaseJmsSourceTask.class);
+    }
+
+    @Test
+    public void taskConfigsTest() {
+        assertEquals(connector.taskConfigs().get(0), null);
+        KeyValue keyValue = new DefaultKeyValue();
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            keyValue.put(requestKey, requestKey);
+        }
+        connector.verifyAndSetConfig(keyValue);
+        assertEquals(connector.taskConfigs().get(0), keyValue);
+    }
+}

[rocketmq-connect] 01/06: Initial rocketmq-connect-jms project

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 8b8c95f4df43b883f27a5ddf739305e55c61c08d
Author: duheng.dh <du...@alibaba-inc.com>
AuthorDate: Thu Jun 13 19:23:14 2019 +0800

    Initial rocketmq-connect-jms project
---
 README.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/README.md
@@ -0,0 +1 @@
+

[rocketmq-connect] 06/06: Add 'connector/rocketmq-connect-jms/' from commit '37a4db762dfac7496e2e0e40cf7cf25c4bd0f01d'

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit b3cbb0b49f361aeb83fc1ec2d26f00c1d6b6e449
Merge: 24c083e 37a4db7
Author: odbozhou <87...@qq.com>
AuthorDate: Wed Mar 2 11:20:19 2022 +0800

    Add 'connector/rocketmq-connect-jms/' from commit '37a4db762dfac7496e2e0e40cf7cf25c4bd0f01d'
    
    git-subtree-dir: connector/rocketmq-connect-jms
    git-subtree-mainline: 24c083eccf9232b7ca7f18e0d0f236af2e68ba7f
    git-subtree-split: 37a4db762dfac7496e2e0e40cf7cf25c4bd0f01d

 connector/rocketmq-connect-jms/pom.xml             | 196 +++++++++++++++++++++
 .../org/apache/rocketmq/connect/jms/Config.java    | 163 +++++++++++++++++
 .../org/apache/rocketmq/connect/jms/ErrorCode.java |   8 +
 .../apache/rocketmq/connect/jms/Replicator.java    |  67 +++++++
 .../jms/connector/BaseJmsSourceConnector.java      |  71 ++++++++
 .../connect/jms/connector/BaseJmsSourceTask.java   | 150 ++++++++++++++++
 .../connect/jms/pattern/PatternProcessor.java      |  90 ++++++++++
 .../jms/connector/BaseJmsSourceConnectorTest.java  |  82 +++++++++
 8 files changed, 827 insertions(+)

diff --cc connector/rocketmq-connect-jms/pom.xml
index 0000000,50e0490..50e0490
mode 000000,100644..100644
--- a/connector/rocketmq-connect-jms/pom.xml
+++ b/connector/rocketmq-connect-jms/pom.xml
diff --cc connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/Config.java
index 0000000,1e69f89..1e69f89
mode 000000,100644..100644
--- a/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/Config.java
+++ b/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/Config.java
diff --cc connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java
index 0000000,b6dafb1..b6dafb1
mode 000000,100644..100644
--- a/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java
+++ b/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java
diff --cc connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java
index 0000000,3e859a4..3e859a4
mode 000000,100644..100644
--- a/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java
+++ b/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java
diff --cc connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
index 0000000,68f7677..68f7677
mode 000000,100644..100644
--- a/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
+++ b/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java
diff --cc connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
index 0000000,7b1f7c4..7b1f7c4
mode 000000,100644..100644
--- a/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
+++ b/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java
diff --cc connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
index 0000000,5399045..5399045
mode 000000,100644..100644
--- a/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
+++ b/connector/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java
diff --cc connector/rocketmq-connect-jms/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
index 0000000,632c9d3..632c9d3
mode 000000,100644..100644
--- a/connector/rocketmq-connect-jms/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java
+++ b/connector/rocketmq-connect-jms/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java

[rocketmq-connect] 05/06: fix(connect-jms) should put .iml file in gitignore

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 37a4db762dfac7496e2e0e40cf7cf25c4bd0f01d
Author: affe <af...@gmail.com>
AuthorDate: Mon Jul 6 09:50:44 2020 +0800

    fix(connect-jms) should put .iml file in gitignore
---
 rocketmq-connect-jms.iml | 45 ---------------------------------------------
 1 file changed, 45 deletions(-)

diff --git a/rocketmq-connect-jms.iml b/rocketmq-connect-jms.iml
deleted file mode 100644
index 0530e24..0000000
--- a/rocketmq-connect-jms.iml
+++ /dev/null
@@ -1,45 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
-  <component name="EclipseModuleManager">
-    <conelement value="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" />
-    <src_description expected_position="0">
-      <src_folder value="file://$MODULE_DIR$/src/main/java" expected_position="0" />
-      <src_folder value="file://$MODULE_DIR$/src/test/java" expected_position="1" />
-    </src_description>
-  </component>
-  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
-    <output url="file://$MODULE_DIR$/target/classes" />
-    <output-test url="file://$MODULE_DIR$/target/test-classes" />
-    <content url="file://$MODULE_DIR$">
-      <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
-      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
-      <excludeFolder url="file://$MODULE_DIR$/target" />
-    </content>
-    <orderEntry type="sourceFolder" forTests="false" />
-    <orderEntry type="jdk" jdkName="JavaSE-1.8" jdkType="JavaSDK" />
-    <orderEntry type="library" name="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" level="application" />
-    <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
-    <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
-    <orderEntry type="library" scope="TEST" name="Maven: org.assertj:assertj-core:2.6.0" level="project" />
-    <orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-core:2.6.3" level="project" />
-    <orderEntry type="library" scope="TEST" name="Maven: net.bytebuddy:byte-buddy:1.6.2" level="project" />
-    <orderEntry type="library" scope="TEST" name="Maven: net.bytebuddy:byte-buddy-agent:1.6.2" level="project" />
-    <orderEntry type="library" scope="TEST" name="Maven: org.objenesis:objenesis:2.5" level="project" />
-    <orderEntry type="library" name="Maven: io.openmessaging:openmessaging-connector:0.1.0-beta" level="project" />
-    <orderEntry type="library" name="Maven: io.openmessaging:openmessaging-api:0.3.1-alpha" level="project" />
-    <orderEntry type="library" name="Maven: com.alibaba:fastjson:1.2.51" level="project" />
-    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.7" level="project" />
-    <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.13" level="project" />
-    <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.13" level="project" />
-    <orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-openmessaging:4.3.2" level="project" />
-    <orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-client:4.3.2" level="project" />
-    <orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-common:4.3.2" level="project" />
-    <orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-remoting:4.3.2" level="project" />
-    <orderEntry type="library" name="Maven: io.netty:netty-all:4.0.42.Final" level="project" />
-    <orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-logging:4.3.2" level="project" />
-    <orderEntry type="library" name="Maven: io.netty:netty-tcnative-boringssl-static:1.1.33.Fork26" level="project" />
-    <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.4" level="project" />
-    <orderEntry type="library" name="Maven: commons-cli:commons-cli:1.2" level="project" />
-    <orderEntry type="library" name="Maven: javax.jms:javax.jms-api:2.0" level="project" />
-  </component>
-</module>
\ No newline at end of file

[rocketmq-connect] 04/06: fix(connect-runtime) remove unused local variable

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 4e3ccda623afd91aa62487dc010136e58ba8f8b4
Author: affe <af...@gmail.com>
AuthorDate: Thu Apr 9 11:19:15 2020 -0700

    fix(connect-runtime) remove unused local variable
---
 rocketmq-connect-jms.iml | 32 ++++++++++++++++++++++++++++----
 1 file changed, 28 insertions(+), 4 deletions(-)

diff --git a/rocketmq-connect-jms.iml b/rocketmq-connect-jms.iml
index b187d53..0530e24 100644
--- a/rocketmq-connect-jms.iml
+++ b/rocketmq-connect-jms.iml
@@ -1,5 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<module type="JAVA_MODULE" version="4">
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
   <component name="EclipseModuleManager">
     <conelement value="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" />
     <src_description expected_position="0">
@@ -7,15 +7,39 @@
       <src_folder value="file://$MODULE_DIR$/src/test/java" expected_position="1" />
     </src_description>
   </component>
-  <component name="NewModuleRootManager">
+  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
     <output url="file://$MODULE_DIR$/target/classes" />
-    <exclude-output />
+    <output-test url="file://$MODULE_DIR$/target/test-classes" />
     <content url="file://$MODULE_DIR$">
       <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
-      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="false" />
+      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
+      <excludeFolder url="file://$MODULE_DIR$/target" />
     </content>
     <orderEntry type="sourceFolder" forTests="false" />
     <orderEntry type="jdk" jdkName="JavaSE-1.8" jdkType="JavaSDK" />
     <orderEntry type="library" name="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER" level="application" />
+    <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.assertj:assertj-core:2.6.0" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-core:2.6.3" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: net.bytebuddy:byte-buddy:1.6.2" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: net.bytebuddy:byte-buddy-agent:1.6.2" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.objenesis:objenesis:2.5" level="project" />
+    <orderEntry type="library" name="Maven: io.openmessaging:openmessaging-connector:0.1.0-beta" level="project" />
+    <orderEntry type="library" name="Maven: io.openmessaging:openmessaging-api:0.3.1-alpha" level="project" />
+    <orderEntry type="library" name="Maven: com.alibaba:fastjson:1.2.51" level="project" />
+    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.7" level="project" />
+    <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.13" level="project" />
+    <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.13" level="project" />
+    <orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-openmessaging:4.3.2" level="project" />
+    <orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-client:4.3.2" level="project" />
+    <orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-common:4.3.2" level="project" />
+    <orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-remoting:4.3.2" level="project" />
+    <orderEntry type="library" name="Maven: io.netty:netty-all:4.0.42.Final" level="project" />
+    <orderEntry type="library" name="Maven: org.apache.rocketmq:rocketmq-logging:4.3.2" level="project" />
+    <orderEntry type="library" name="Maven: io.netty:netty-tcnative-boringssl-static:1.1.33.Fork26" level="project" />
+    <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.4" level="project" />
+    <orderEntry type="library" name="Maven: commons-cli:commons-cli:1.2" level="project" />
+    <orderEntry type="library" name="Maven: javax.jms:javax.jms-api:2.0" level="project" />
   </component>
 </module>
\ No newline at end of file