You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/01 13:45:16 UTC

[rocketmq-connect] 05/12: change

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 666376814f146732aec5be3251e4f75f170654cb
Author: laohu <23...@qq.com>
AuthorDate: Thu Jun 6 19:36:31 2019 +0800

    change
---
 .../connect/activemq/connector/ActivemqTask.java   |  6 +--
 .../activemq/connector/ActivemqTaskTest.java       | 63 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
index 9743a19..c2950fa 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java
@@ -52,11 +52,7 @@ public class ActivemqTask extends SourceTask {
 
         try {
         	Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
-        	DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(null);
-        	dataEntryBuilder.timestamp(System.currentTimeMillis());
-            SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-                    ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")),
-                    ByteBuffer.wrap(JSON.toJSONBytes(message)));
+            SourceDataEntry sourceDataEntry = new SourceDataEntry(ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")), ByteBuffer.wrap(JSON.toJSONBytes(message)), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
             
             res.add(sourceDataEntry);
         } catch (Exception e) {
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
new file mode 100644
index 0000000..5dcb6a7
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTaskTest.java
@@ -0,0 +1,63 @@
+package org.apache.rocketmq.connect.activemq.connector;
+
+import java.util.Collection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.rocketmq.connect.activemq.Config;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class ActivemqTaskTest {
+
+	@Before
+	public void befores() throws JMSException, InterruptedException {
+		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166");
+		Connection connection = connectionFactory.createConnection();
+
+		connection.start();
+		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+		Destination destination = session.createQueue("test-queue");
+
+		MessageProducer producer = session.createProducer(destination);
+
+		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+		for (int i = 0; i < 20; i++) {
+			TextMessage message = session.createTextMessage("hello 我是消息:" + i);
+			producer.send(message);
+		}
+
+		session.commit();
+		session.close();
+		connection.close();
+	}
+
+	@Test
+	public void test() throws InterruptedException {
+		KeyValue kv = new DefaultKeyValue();
+		kv.put("activemqUrl", "tcp://112.74.48.251:6166");
+		kv.put("destinationType", "queue");
+		kv.put("destinationName", "test-queue");
+		ActivemqTask task = new ActivemqTask();
+		task.start(kv);
+		for(int i = 0 ; i < 20;) {
+			Collection<SourceDataEntry> sourceDataEntry = task.poll();
+			i = i+sourceDataEntry.size();
+			System.out.println(sourceDataEntry);
+		}
+		Thread.sleep(20000);
+		
+	}
+}