You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/01 13:45:15 UTC
[rocketmq-connect] 04/12: change
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit da0b21fb846de119897f6086737be5de34f1cbac
Author: laohu <23...@qq.com>
AuthorDate: Thu Jun 6 14:36:18 2019 +0800
change
---
README-CN.md | 16 +++++++++
README.md | 16 ++++++++-
.../apache/rocketmq/connect/activemq/Config.java | 38 ++++++++++++++++++++--
.../rocketmq/connect/activemq/Replicator.java | 3 --
.../connect/activemq/connector/ActivemqTask.java | 10 ++++--
.../connect/activemq/pattern/PatternProcessor.java | 16 ++++-----
6 files changed, 81 insertions(+), 18 deletions(-)
diff --git a/README-CN.md b/README-CN.md
new file mode 100644
index 0000000..bd745e2
--- /dev/null
+++ b/README-CN.md
@@ -0,0 +1,16 @@
+##### ActiveConnector完全限定名
+org.apache.rocketmq.connect.activemq.connector.ActivemqConnector
+
+
+##### 配置参数
+
+参数 | 作用 | 是否必填 | 默认值
+---|--- |---
+activemq.url | activemq ip与端口号 | 是 | 无
+activemq.username | 用户名 | 否 | 无
+activemq.password| 密码 | 否 | 无
+jms.destination.name | 读取的队列或者主题名 | 是 | 无
+jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无
+jms.message.selector | 过滤器 | 否 |无
+jms.session.acknowledge.mode | 消息确认 | 否 | Session.AUTO_ACKNOWLEDGE
+jms.session.transacted | 是否是事务会话 | 否 | false
diff --git a/README.md b/README.md
index 8b13789..e965865 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,15 @@
-
+参数 | 作用 |
+---|--- |---
+activemq.url | activemq ip与端口号
+activemq.username | 用户名
+activemq.password| 密码
+jms.destination.name | 读取的队列或者主题名
+jms.destination.type | 读取的类型:queue(队列)或者topic(主题)
+jms.message.selector | 过滤器
+jms.session.acknowledge.mode | 消息确认
+jms.session.transacted | 是否是事务会话
+rocketmq.topic | 发送的topic
+rocketmq.name | broker的用户名
+rocketmq.sk |
+rocketmq.ak |
+rocketmq.nameserver | nameserver url
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
index 15a95a7..af218c0 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java
@@ -21,15 +21,16 @@ import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;
+import javax.jms.Session;
+
import io.openmessaging.KeyValue;
public class Config {
+ @SuppressWarnings("serial")
public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
{
add("activemqUrl");
- add("activemqUsername");
- add("activemqPassword");
add("destinationType");
add("destinationName");
}
@@ -44,7 +45,13 @@ public class Config {
public String destinationType;
public String destinationName;
+
+ public String messageSelector;
+
+ private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+ private Boolean sessionTransacted = Boolean.FALSE;
+
public void load(KeyValue props) {
properties2Object(props, this);
@@ -130,4 +137,31 @@ public class Config {
public void setDestinationName(String destinationName) {
this.destinationName = destinationName;
}
+
+ public String getMessageSelector() {
+ return messageSelector;
+ }
+
+ public void setMessageSelector(String messageSelector) {
+ this.messageSelector = messageSelector;
+ }
+
+ public Integer getSessionAcknowledgeMode() {
+ return sessionAcknowledgeMode;
+ }
+
+ public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
+ this.sessionAcknowledgeMode = sessionAcknowledgeMode;
+ }
+
+ public Boolean getSessionTransacted() {
+ return sessionTransacted;
+ }
+
+ public void setSessionTransacted(Boolean sessionTransacted) {
+ this.sessionTransacted = sessionTransacted;
+ }
+
+
+
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index 499beb0..dd83c37 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -30,12 +30,9 @@ public class Replicator {
private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
- private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
-
private PatternProcessor processor;
private Config config;
- private Object lock = new Object();
private BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
public Replicator(Config config){
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
index f871be5..9743a19 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.connect.activemq.connector;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.connector.api.source.SourceTask;
@@ -50,7 +52,11 @@ public class ActivemqTask extends SourceTask {
try {
Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
- SourceDataEntry sourceDataEntry = null;
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(null);
+ dataEntryBuilder.timestamp(System.currentTimeMillis());
+ SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+ ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")),
+ ByteBuffer.wrap(JSON.toJSONBytes(message)));
res.add(sourceDataEntry);
} catch (Exception e) {
@@ -66,10 +72,10 @@ public class ActivemqTask extends SourceTask {
this.config = new Config();
this.config.load(props);
this.replicator = new Replicator(config);
- this.replicator.start();
} catch (Exception e) {
log.error("Mysql task start failed.", e);
}
+ this.replicator.start();
}
@Override
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index c1b282c..b26bfb9 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -32,31 +32,27 @@ public class PatternProcessor {
}
public void start() {
+ if(!StringUtils.equals("topic", config.getDestinationType())&&!StringUtils.equals("queue", config.getDestinationType())) {
+ throw new RuntimeException("destination type is incorrectness");
+ }
+
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
- //2、使用连接工厂创建一个连接对象
if(StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword()) ) {
connection = connectionFactory.createConnection(config.getActivemqUsername() , config.getActivemqPassword());
}else {
connection = connectionFactory.createConnection();
}
- //3、开启连接
connection.start();
- //4、使用连接对象创建会话(session)对象
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
+ Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
Destination destination = null;
if(StringUtils.equals("topic", config.getDestinationType())) {
destination = session.createTopic(config.getDestinationName());
}else if(StringUtils.equals("queue", config.getDestinationType())){
destination = session.createQueue(config.getDestinationName());
- }else {
- throw new RuntimeException("");
}
- consumer = session.createConsumer(destination);
- //6、使用会话对象创建生产者对象
- //7、向consumer对象中设置一个messageListener对象,用来接收消息
+ consumer = session.createConsumer(destination, config.getMessageSelector());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {