You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/01 13:45:15 UTC

[rocketmq-connect] 04/12: change

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit da0b21fb846de119897f6086737be5de34f1cbac
Author: laohu <23...@qq.com>
AuthorDate: Thu Jun 6 14:36:18 2019 +0800

    change
---
 README-CN.md                                       | 16 +++++++++
 README.md                                          | 16 ++++++++-
 .../apache/rocketmq/connect/activemq/Config.java   | 38 ++++++++++++++++++++--
 .../rocketmq/connect/activemq/Replicator.java      |  3 --
 .../connect/activemq/connector/ActivemqTask.java   | 10 ++++--
 .../connect/activemq/pattern/PatternProcessor.java | 16 ++++-----
 6 files changed, 81 insertions(+), 18 deletions(-)

diff --git a/README-CN.md b/README-CN.md
new file mode 100644
index 0000000..bd745e2
--- /dev/null
+++ b/README-CN.md
@@ -0,0 +1,16 @@
+##### ActiveConnector完全限定名
+org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
+
+
+##### 配置参数
+
+参数 | 作用 | 是否必填 | 默认值
+---|--- |---
+activemq.url | activemq ip与端口号 | 是 | 无
+activemq.username | 用户名 | 否 |  无
+activemq.password|  密码    | 否  | 无
+jms.destination.name | 读取的队列或者主题名   |  是 | 无
+jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
+jms.message.selector | 过滤器    |  否  |无
+jms.session.acknowledge.mode | 消息确认  | 否 | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | 是否是事务会话      | 否 | false
diff --git a/README.md b/README.md
index 8b13789..e965865 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,15 @@
-
+参数 | 作用 | 
+---|--- |---
+activemq.url | activemq ip与端口号
+activemq.username | 用户名
+activemq.password|  密码
+jms.destination.name | 读取的队列或者主题名
+jms.destination.type | 读取的类型:queue(队列)或者topic(主题)
+jms.message.selector | 过滤器
+jms.session.acknowledge.mode | 消息确认
+jms.session.transacted | 是否是事务会话
+rocketmq.topic        | 发送的topic
+rocketmq.name         | broker的用户名
+rocketmq.sk |           
+rocketmq.ak |
+rocketmq.nameserver |  nameserver url
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
index 15a95a7..af218c0 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
@@ -21,15 +21,16 @@ import java.lang.reflect.Method;
 import java.util.HashSet;
 import java.util.Set;
 
+import javax.jms.Session;
+
 import io.openmessaging.KeyValue;
 
 public class Config {
 
+	@SuppressWarnings("serial")
 	public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
 		{
 			add("activemqUrl");
-			add("activemqUsername");
-			add("activemqPassword");
 			add("destinationType");
 			add("destinationName");
 		}
@@ -44,7 +45,13 @@ public class Config {
 	public String destinationType;
 
 	public String destinationName;
+	
+	public String messageSelector;
+	
+	private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
 
+	private Boolean sessionTransacted = Boolean.FALSE;
+	
 	public void load(KeyValue props) {
 
 		properties2Object(props, this);
@@ -130,4 +137,31 @@ public class Config {
 	public void setDestinationName(String destinationName) {
 		this.destinationName = destinationName;
 	}
+
+	public String getMessageSelector() {
+		return messageSelector;
+	}
+
+	public void setMessageSelector(String messageSelector) {
+		this.messageSelector = messageSelector;
+	}
+
+	public Integer getSessionAcknowledgeMode() {
+		return sessionAcknowledgeMode;
+	}
+
+	public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
+		this.sessionAcknowledgeMode = sessionAcknowledgeMode;
+	}
+
+	public Boolean getSessionTransacted() {
+		return sessionTransacted;
+	}
+
+	public void setSessionTransacted(Boolean sessionTransacted) {
+		this.sessionTransacted = sessionTransacted;
+	}
+	
+	
+	
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index 499beb0..dd83c37 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -30,12 +30,9 @@ public class Replicator {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
 
-    private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
-
     private PatternProcessor processor;
     
     private Config config;
-    private Object lock = new Object();
     private BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
 
     public Replicator(Config config){
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
index f871be5..9743a19 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.connect.activemq.connector;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.fastjson.JSON;
 
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
 import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.connector.api.source.SourceTask;
 
@@ -50,7 +52,11 @@ public class ActivemqTask extends SourceTask {
 
         try {
         	Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
-            SourceDataEntry sourceDataEntry = null;
+        	DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(null);
+        	dataEntryBuilder.timestamp(System.currentTimeMillis());
+            SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+                    ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")),
+                    ByteBuffer.wrap(JSON.toJSONBytes(message)));
             
             res.add(sourceDataEntry);
         } catch (Exception e) {
@@ -66,10 +72,10 @@ public class ActivemqTask extends SourceTask {
             this.config = new Config();
             this.config.load(props);
             this.replicator = new Replicator(config);
-            this.replicator.start();
         } catch (Exception e) {
             log.error("Mysql task start failed.", e);
         }
+        this.replicator.start();
     }
 
     @Override
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index c1b282c..b26bfb9 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -32,31 +32,27 @@ public class PatternProcessor {
 	}
 	
 	public void start() {
+		if(!StringUtils.equals("topic", config.getDestinationType())&&!StringUtils.equals("queue", config.getDestinationType())) {
+			throw new RuntimeException("destination type is incorrectness");
+		}
+		
 		try {
 		   ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
 		   
-	        //2、使用连接工厂创建一个连接对象
 		   if(StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword()) ) {
 	           connection = connectionFactory.createConnection(config.getActivemqUsername() , config.getActivemqPassword());
 		   }else {
 			   connection = connectionFactory.createConnection();
 		   }
-	        //3、开启连接
 	        connection.start();
-	        //4、使用连接对象创建会话(session)对象
-	        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-	        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
+	        Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
 	        Destination destination = null;
 	        if(StringUtils.equals("topic", config.getDestinationType())) {
 	        	destination = session.createTopic(config.getDestinationName());
 	        }else if(StringUtils.equals("queue", config.getDestinationType())){
 	        	destination = session.createQueue(config.getDestinationName());
-	        }else {
-	        	throw new RuntimeException("");
 	        }
-	        consumer = session.createConsumer(destination);
-	        //6、使用会话对象创建生产者对象
-	        //7、向consumer对象中设置一个messageListener对象,用来接收消息
+	        consumer = session.createConsumer(destination, config.getMessageSelector());
 	        consumer.setMessageListener(new MessageListener() {
 	            @Override
 	            public void onMessage(Message message) {