You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/01 13:45:20 UTC

[rocketmq-connect] 09/12: change exception

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit db391c5c580d1d251aff442d994c9932af1a35c6
Author: laohu <23...@qq.com>
AuthorDate: Thu Jun 13 10:12:29 2019 +0800

    change exception
---
 pom.xml                                            |  5 --
 .../rocketmq/connect/activemq/ErrorCode.java       |  8 +++
 .../rocketmq/connect/activemq/Replicator.java      | 18 +++---
 .../activemq/connector/ActivemqSourceTask.java     | 21 +++++--
 .../connect/activemq/pattern/PatternProcessor.java | 66 ++++++++++------------
 .../rocketmq/connect/activemq/ReplicatorTest.java  |  4 +-
 6 files changed, 64 insertions(+), 58 deletions(-)

diff --git a/pom.xml b/pom.xml
index 6e933bd..473a663 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,11 +156,6 @@
 			<version>0.1.0-beta</version>
 		</dependency>
 		<dependency>
-			<groupId>io.openmessaging</groupId>
-			<artifactId>openmessaging-api</artifactId>
-			<version>0.3.1-alpha</version>
-		</dependency>
-		<dependency>
 			<groupId>com.alibaba</groupId>
 			<artifactId>fastjson</artifactId>
 			<version>1.2.51</version>
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java b/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java
new file mode 100644
index 0000000..de3b3f5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java
@@ -0,0 +1,8 @@
+package org.apache.rocketmq.connect.activemq;
+
+public class ErrorCode {
+
+    public static final int START_ERROR_CODE = 10001;
+
+    public static final int STOP_ERROR_CODE = 10002;
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
index 2b18481..e0ebe12 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.connect.activemq;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+
 import javax.jms.Message;
+
 import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,19 +39,13 @@ public class Replicator {
         this.config = config;
     }
 
-    public void start() {
-
-        try {
-            processor = new PatternProcessor(this);
-            processor.start();
-
-        } catch (Exception e) {
-            LOGGER.error("Start error.", e);
-            throw new RuntimeException(e);
-        }
+    public void start() throws Exception {
+        processor = new PatternProcessor(this);
+        processor.start();
+        LOGGER.info("Replicator start succeed");
     }
 
-    public void stop() {
+    public void stop() throws Exception {
         processor.stop();
     }
 
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
index 2140b5f..9d1a1aa 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.connect.activemq.connector;
 
 import com.alibaba.fastjson.JSON;
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.EntryType;
 import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.exception.DataConnectException;
 import io.openmessaging.connector.api.source.SourceTask;
 import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
@@ -38,6 +40,7 @@ import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 import org.apache.rocketmq.connect.activemq.Config;
+import org.apache.rocketmq.connect.activemq.ErrorCode;
 import org.apache.rocketmq.connect.activemq.Replicator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +61,8 @@ public class ActivemqSourceTask extends SourceTask {
         try {
             Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
             if (message != null) {
-                SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null);
+                Object[] payload = new Object[] {config.getDestinationType(), config.getDestinationName(), getMessageConnent(message)};
+                SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), EntryType.CREATE, null, null, payload);
                 res.add(sourceDataEntry);
             }
         } catch (Exception e) {
@@ -74,15 +78,21 @@ public class ActivemqSourceTask extends SourceTask {
             this.config.load(props);
             this.sourcePartition = ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8"));
             this.replicator = new Replicator(config);
+            this.replicator.start();
         } catch (Exception e) {
-            log.error("Mysql task start failed.", e);
+            log.error("activemq task start failed.", e);
+            throw new DataConnectException(ErrorCode.START_ERROR_CODE, e.getMessage(), e);
         }
-        this.replicator.start();
     }
 
     @Override
     public void stop() {
-        replicator.stop();
+        try {
+            replicator.stop();
+        } catch (Exception e) {
+            log.error("activemq task stop failed.", e);
+            throw new DataConnectException(ErrorCode.STOP_ERROR_CODE, e.getMessage(), e);
+        }
     }
 
     @Override public void pause() {
@@ -123,8 +133,9 @@ public class ActivemqSourceTask extends SourceTask {
             }
             data = bis.toByteArray();
         } else {
+            // The exception is printed and does not need to be written as a DataConnectException
             throw new RuntimeException("message type exception");
         }
-        return data != null ? ByteBuffer.wrap(data) : null;
+        return ByteBuffer.wrap(data);
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
index b0836f9..6e39a7e 100644
--- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
+++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java
@@ -20,11 +20,11 @@ package org.apache.rocketmq.connect.activemq.pattern;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.activemq.Config;
@@ -47,47 +47,43 @@ public class PatternProcessor {
         this.config = replicator.getConfig();
     }
 
-    public void start() {
-        if (!StringUtils.equals("topic", config.getDestinationType()) && !StringUtils.equals("queue", config.getDestinationType())) {
+    public void start() throws Exception {
+        if (!StringUtils.equals("topic", config.getDestinationType())
+            && !StringUtils.equals("queue", config.getDestinationType())) {
+            // RuntimeException is caught by DataConnectException
             throw new RuntimeException("destination type is incorrectness");
         }
 
-        try {
-            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl());
 
-            if (StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword())) {
-                connection = connectionFactory.createConnection(config.getActivemqUsername(), config.getActivemqPassword());
-            } else {
-                connection = connectionFactory.createConnection();
-            }
-            connection.start();
-            Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
-            Destination destination = null;
-            if (StringUtils.equals("topic", config.getDestinationType())) {
-                destination = session.createTopic(config.getDestinationName());
-            } else if (StringUtils.equals("queue", config.getDestinationType())) {
-                destination = session.createQueue(config.getDestinationName());
-            }
-            consumer = session.createConsumer(destination, config.getMessageSelector());
-            consumer.setMessageListener(new MessageListener() {
-                @Override
-                public void onMessage(Message message) {
-                    replicator.commit(message, true);
-                }
-            });
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+        if (StringUtils.isNotBlank(config.getActivemqUsername())
+            && StringUtils.isNotBlank(config.getActivemqPassword())) {
+            connection = connectionFactory.createConnection(config.getActivemqUsername(), config.getActivemqPassword());
+        } else {
+            connection = connectionFactory.createConnection();
+        }
+        connection.start();
+        Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode());
+        Destination destination = null;
+        if (StringUtils.equals("topic", config.getDestinationType())) {
+            destination = session.createTopic(config.getDestinationName());
+        } else if (StringUtils.equals("queue", config.getDestinationType())) {
+            destination = session.createQueue(config.getDestinationName());
         }
+        consumer = session.createConsumer(destination, config.getMessageSelector());
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                replicator.commit(message, true);
+            }
+        });
+
     }
 
-    public void stop() {
-        try {
-            consumer.close();
-            session.close();
-            connection.close();
-        } catch (JMSException e) {
-            throw new RuntimeException(e);
-        }
+    public void stop() throws Exception {
+        consumer.close();
+        session.close();
+        connection.close();
     }
 
 }
diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
index 28cafa4..b94237e 100644
--- a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java
@@ -50,12 +50,12 @@ public class ReplicatorTest {
     }
 
     @Test(expected = RuntimeException.class)
-    public void startTest() {
+    public void startTest() throws Exception {
         replicator.start();
     }
 
     @Test
-    public void stop() {
+    public void stop() throws Exception {
         replicator.stop();
         Mockito.verify(patternProcessor, Mockito.times(1)).stop();
     }