You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/01 13:45:17 UTC
[rocketmq-connect] 06/12: message type
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit d50ffb8ed6ddef82e24a4071c03a96ee3e4f1c84
Author: laohu <23...@qq.com>
AuthorDate: Tue Jun 11 08:36:05 2019 +0800
message type
---
pom.xml | 5 --
.../connect/activemq/connector/ActivemqTask.java | 63 +++++++++++++++++++---
.../activemq/connector/ActivemqTaskTest.java | 5 ++
3 files changed, 60 insertions(+), 13 deletions(-)
diff --git a/pom.xml b/pom.xml
index f22c291..6e933bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,11 +152,6 @@
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
- <artifactId>openmessaging-connect-runtime</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
<version>0.1.0-beta</version>
</dependency>
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
index c2950fa..7dead7b 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
@@ -17,13 +17,24 @@
package org.apache.rocketmq.connect.activemq.connector;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
import org.apache.rocketmq.connect.activemq.Config;
import org.apache.rocketmq.connect.activemq.Replicator;
@@ -33,7 +44,6 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.connector.api.source.SourceTask;
@@ -44,17 +54,19 @@ public class ActivemqTask extends SourceTask {
private Replicator replicator;
private Config config;
+
+ private ByteBuffer sourcePartition;
- @Override
+
+ @Override
public Collection<SourceDataEntry> poll() {
-
List<SourceDataEntry> res = new ArrayList<>();
-
try {
Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
- SourceDataEntry sourceDataEntry = new SourceDataEntry(ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")), ByteBuffer.wrap(JSON.toJSONBytes(message)), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
-
- res.add(sourceDataEntry);
+ if(message != null) {
+ SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
+ res.add(sourceDataEntry);
+ }
} catch (Exception e) {
log.error("Mysql task poll error, current config:" + JSON.toJSONString(config), e);
}
@@ -63,10 +75,10 @@ public class ActivemqTask extends SourceTask {
@Override
public void start(KeyValue props) {
-
try {
this.config = new Config();
this.config.load(props);
+ this.sourcePartition = ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8"));
this.replicator = new Replicator(config);
} catch (Exception e) {
log.error("Mysql task start failed.", e);
@@ -86,4 +98,39 @@ public class ActivemqTask extends SourceTask {
@Override public void resume() {
}
+
+ @SuppressWarnings("unchecked")
+ public ByteBuffer getMessageConnent(Message message ) throws JMSException {
+ byte[] data = null;
+ if(message instanceof TextMessage) {
+ data = ((TextMessage) message).getText().getBytes();
+ }else if(message instanceof ObjectMessage) {
+ data = JSON.toJSONBytes( ((ObjectMessage) message).getObject());
+ }else if(message instanceof BytesMessage) {
+ BytesMessage bytesMessage = (BytesMessage)message;
+ data = new byte[(int) bytesMessage.getBodyLength()];
+ bytesMessage.readBytes(data);
+ }else if(message instanceof MapMessage) {
+ MapMessage mapMessage = (MapMessage)message;
+ Map<String,Object> map = new HashMap<>();
+ Enumeration<Object> names = mapMessage.getMapNames();
+ while(names.hasMoreElements()) {
+ String name = names.nextElement().toString();
+ map.put(name, mapMessage.getObject(name));
+ }
+ data = JSON.toJSONBytes(map);
+ }else if(message instanceof StreamMessage) {
+ StreamMessage streamMessage = (StreamMessage)message;
+ ByteArrayOutputStream bis = new ByteArrayOutputStream();
+ byte[] by = new byte[1024];
+ int i = 0;
+ while( (i = streamMessage.readBytes(by)) != 0) {
+ bis.write(by, 0, i);
+ }
+ data = bis.toByteArray();
+ }else {
+ throw new RuntimeException("message type exception");
+ }
+ return data!=null ? ByteBuffer.wrap( data ) : null;
+ }
}
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
index 5dcb6a7..780cbc9 100644
--- a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
@@ -45,6 +45,11 @@ public class ActivemqTaskTest {
}
@Test
+ public void nullTest() {
+
+ }
+
+ @Test
public void test() throws InterruptedException {
KeyValue kv = new DefaultKeyValue();
kv.put("activemqUrl", "tcp://112.74.48.251:6166");