You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/05/27 06:09:49 UTC

[rocketmq-connect] branch master updated: [ISSUE #147] Transformchain adds a close method to unload the objects in the transform when the connector is closed #147 (#149)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 732f127  [ISSUE #147] Transformchain adds a close method to unload the objects in the transform when the connector is closed #147 (#149)
732f127 is described below

commit 732f1274ddd75312f9750e802167e51fe19e66aa
Author: xiaoyi <su...@163.com>
AuthorDate: Fri May 27 14:09:46 2022 +0800

    [ISSUE #147] Transformchain adds a close method to unload the objects in the transform when the connector is closed #147 (#149)
    
    Co-authored-by: “sunxiaojian” <“sunxiaojian926@163.com”>
---
 .../connect/runtime/connectorwrapper/TransformChain.java  | 15 ++++++++++++++-
 .../connect/runtime/connectorwrapper/WorkerSinkTask.java  |  5 +++++
 .../runtime/connectorwrapper/WorkerSourceTask.java        |  5 +++++
 3 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index 3de0222..485297a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -23,6 +23,8 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.Transform;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.internal.DefaultKeyValue;
+
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -35,7 +37,7 @@ import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TransformChain<R extends ConnectRecord> {
+public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
 
     private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
@@ -122,4 +124,15 @@ public class TransformChain<R extends ConnectRecord> {
         Plugin.compareAndSwapLoaders(currentThreadLoader);
         return transform;
     }
+
+    /**
+     * close transforms
+     * @throws Exception if this resource cannot be closed
+     */
+    @Override
+    public void close() throws Exception {
+        for (Transform transform : transformList) {
+            transform.stop();
+        }
+    }
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 8f95747..40db678 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -502,6 +502,11 @@ public class WorkerSinkTask implements WorkerTask {
     @Override
     public void stop() {
         state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
+        try {
+            transformChain.close();
+        } catch (Exception exception) {
+            log.error("Transform close failed,{}", exception);
+        }
     }
 
     @Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 8e62ca3..2743481 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -246,6 +246,11 @@ public class WorkerSourceTask implements WorkerTask {
     @Override
     public void stop() {
         state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
+        try {
+            transformChain.close();
+        } catch (Exception exception) {
+            log.error("Transform close failed,{}", exception);
+        }
         log.warn("Stop a task success.");
     }