You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/05/27 06:09:49 UTC
[rocketmq-connect] branch master updated: [ISSUE #147] Transformchain adds a close method to unload the objects in the transform when the connector is closed #147 (#149)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 732f127 [ISSUE #147] Transformchain adds a close method to unload the objects in the transform when the connector is closed #147 (#149)
732f127 is described below
commit 732f1274ddd75312f9750e802167e51fe19e66aa
Author: xiaoyi <su...@163.com>
AuthorDate: Fri May 27 14:09:46 2022 +0800
[ISSUE #147] Transformchain adds a close method to unload the objects in the transform when the connector is closed #147 (#149)
Co-authored-by: “sunxiaojian” <“sunxiaojian926@163.com”>
---
.../connect/runtime/connectorwrapper/TransformChain.java | 15 ++++++++++++++-
.../connect/runtime/connectorwrapper/WorkerSinkTask.java | 5 +++++
.../runtime/connectorwrapper/WorkerSourceTask.java | 5 +++++
3 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index 3de0222..485297a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -23,6 +23,8 @@ import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.internal.DefaultKeyValue;
+
+import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -35,7 +37,7 @@ import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TransformChain<R extends ConnectRecord> {
+public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
@@ -122,4 +124,15 @@ public class TransformChain<R extends ConnectRecord> {
Plugin.compareAndSwapLoaders(currentThreadLoader);
return transform;
}
+
+ /**
+ * close transforms
+ * @throws Exception if this resource cannot be closed
+ */
+ @Override
+ public void close() throws Exception {
+ for (Transform transform : transformList) {
+ transform.stop();
+ }
+ }
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 8f95747..40db678 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -502,6 +502,11 @@ public class WorkerSinkTask implements WorkerTask {
@Override
public void stop() {
state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
+ try {
+ transformChain.close();
+ } catch (Exception exception) {
+ log.error("Transform close failed,{}", exception);
+ }
}
@Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 8e62ca3..2743481 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -246,6 +246,11 @@ public class WorkerSourceTask implements WorkerTask {
@Override
public void stop() {
state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
+ try {
+ transformChain.close();
+ } catch (Exception exception) {
+ log.error("Transform close failed,{}", exception);
+ }
log.warn("Stop a task success.");
}