You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/01 13:45:20 UTC
[rocketmq-connect] 09/12: change exception
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit db391c5c580d1d251aff442d994c9932af1a35c6
Author: laohu <23...@qq.com>
AuthorDate: Thu Jun 13 10:12:29 2019 +0800
change exception
---
pom.xml | 5 --
.../rocketmq/connect/activemq/ErrorCode.java | 8 +++
.../rocketmq/connect/activemq/Replicator.java | 18 +++---
.../activemq/connector/ActivemqSourceTask.java | 21 +++++--
.../connect/activemq/pattern/PatternProcessor.java | 66 ++++++++++------------
.../rocketmq/connect/activemq/ReplicatorTest.java | 4 +-
6 files changed, 64 insertions(+), 58 deletions(-)
diff --git a/pom.xml b/pom.xml
index 6e933bd..473a663 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,11 +156,6 @@
<version>0.1.0-beta</version>
</dependency>
<dependency>
- <groupId>io.openmessaging</groupId>
- <artifactId>openmessaging-api</artifactId>
- <version>0.3.1-alpha</version>
- </dependency>
- <dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java b/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java
new file mode 100644
index 0000000..de3b3f5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java
@@ -0,0 +1,8 @@
+package org.apache.rocketmq.connect.activemq;
+
+public class ErrorCode {
+
+ public static final int START_ERROR_CODE = 10001;
+
+ public static final int STOP_ERROR_CODE = 10002;
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index 2b18481..e0ebe12 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.connect.activemq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+
import javax.jms.Message;
+
import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,19 +39,13 @@ public class Replicator {
this.config = config;
}
- public void start() {
-
- try {
- processor = new PatternProcessor(this);
- processor.start();
-
- } catch (Exception e) {
- LOGGER.error("Start error.", e);
- throw new RuntimeException(e);
- }
+ public void start() throws Exception {
+ processor = new PatternProcessor(this);
+ processor.start();
+ LOGGER.info("Replicator start succeed");
}
- public void stop() {
+ public void stop() throws Exception {
processor.stop();
}
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
index 2140b5f..9d1a1aa 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.connect.activemq.connector;
import com.alibaba.fastjson.JSON;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.EntryType;
import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.exception.DataConnectException;
import io.openmessaging.connector.api.source.SourceTask;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
@@ -38,6 +40,7 @@ import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.rocketmq.connect.activemq.Config;
+import org.apache.rocketmq.connect.activemq.ErrorCode;
import org.apache.rocketmq.connect.activemq.Replicator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +61,8 @@ public class ActivemqSourceTask extends SourceTask {
try {
Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
if (message != null) {
- SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
+ Object[] payload = new Object[] {config.getDestinationType(), config.getDestinationName(), getMessageConnent(message)};
+ SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), EntryType.CREATE, null, null, payload);
res.add(sourceDataEntry);
}
} catch (Exception e) {
@@ -74,15 +78,21 @@ public class ActivemqSourceTask extends SourceTask {
this.config.load(props);
this.sourcePartition = ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8"));
this.replicator = new Replicator(config);
+ this.replicator.start();
} catch (Exception e) {
- log.error("Mysql task start failed.", e);
+ log.error("activemq task start failed.", e);
+ throw new DataConnectException(ErrorCode.START_ERROR_CODE, e.getMessage(), e);
}
- this.replicator.start();
}
@Override
public void stop() {
- replicator.stop();
+ try {
+ replicator.stop();
+ } catch (Exception e) {
+ log.error("activemq task stop failed.", e);
+ throw new DataConnectException(ErrorCode.STOP_ERROR_CODE, e.getMessage(), e);
+ }
}
@Override public void pause() {
@@ -123,8 +133,9 @@ public class ActivemqSourceTask extends SourceTask {
}
data = bis.toByteArray();
} else {
+ // The exception is printed and does not need to be written as a DataConnectException
throw new RuntimeException("message type exception");
}
- return data != null ? ByteBuffer.wrap(data) : null;
+ return ByteBuffer.wrap(data);
}
}
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index b0836f9..6e39a7e 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -20,11 +20,11 @@ package org.apache.rocketmq.connect.activemq.pattern;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
-import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.activemq.Config;
@@ -47,47 +47,43 @@ public class PatternProcessor {
this.config = replicator.getConfig();
}
- public void start() {
- if (!StringUtils.equals("topic", config.getDestinationType()) && !StringUtils.equals("queue", config.getDestinationType())) {
+ public void start() throws Exception {
+ if (!StringUtils.equals("topic", config.getDestinationType())
+ && !StringUtils.equals("queue", config.getDestinationType())) {
+ // RuntimeException is caught by DataConnectException
throw new RuntimeException("destination type is incorrectness");
}
- try {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
- if (StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword())) {
- connection = connectionFactory.createConnection(config.getActivemqUsername(), config.getActivemqPassword());
- } else {
- connection = connectionFactory.createConnection();
- }
- connection.start();
- Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
- Destination destination = null;
- if (StringUtils.equals("topic", config.getDestinationType())) {
- destination = session.createTopic(config.getDestinationName());
- } else if (StringUtils.equals("queue", config.getDestinationType())) {
- destination = session.createQueue(config.getDestinationName());
- }
- consumer = session.createConsumer(destination, config.getMessageSelector());
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- replicator.commit(message, true);
- }
- });
- } catch (Exception e) {
- throw new RuntimeException(e);
+ if (StringUtils.isNotBlank(config.getActivemqUsername())
+ && StringUtils.isNotBlank(config.getActivemqPassword())) {
+ connection = connectionFactory.createConnection(config.getActivemqUsername(), config.getActivemqPassword());
+ } else {
+ connection = connectionFactory.createConnection();
+ }
+ connection.start();
+ Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
+ Destination destination = null;
+ if (StringUtils.equals("topic", config.getDestinationType())) {
+ destination = session.createTopic(config.getDestinationName());
+ } else if (StringUtils.equals("queue", config.getDestinationType())) {
+ destination = session.createQueue(config.getDestinationName());
}
+ consumer = session.createConsumer(destination, config.getMessageSelector());
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ replicator.commit(message, true);
+ }
+ });
+
}
- public void stop() {
- try {
- consumer.close();
- session.close();
- connection.close();
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
+ public void stop() throws Exception {
+ consumer.close();
+ session.close();
+ connection.close();
}
}
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
index 28cafa4..b94237e 100644
--- a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
@@ -50,12 +50,12 @@ public class ReplicatorTest {
}
@Test(expected = RuntimeException.class)
- public void startTest() {
+ public void startTest() throws Exception {
replicator.start();
}
@Test
- public void stop() {
+ public void stop() throws Exception {
replicator.stop();
Mockito.verify(patternProcessor, Mockito.times(1)).stop();
}