You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/03/21 10:41:11 UTC

[GitHub] [incubator-seatunnel] ruanwenjun opened a new pull request #1530: [Feature][core] Add plugin lifecycle of plugin.

ruanwenjun opened a new pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530


   ## Purpose of this pull request
   
   close #1529 
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in you PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/developement/NewLicenseGuide.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] kezhenxu94 commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r831812068



##########
File path: seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
##########
@@ -39,7 +51,24 @@ default CheckResult checkConfig() {
         return CheckResult.success();
     }
 
-    default void prepare(T prepareEnv) {
+    /**
+     * This is a lifecycle method, this method will be executed after Plugin created.
+     *
+     * @param env environment
+     */
+    default void prepare(T env) {
+
+    }
+
+    /**
+     * This is a lifecycle method, this method will be executed before Plugin destroy.
+     * It's used to release some resource. We will have multiple plugin, if the former plugin throw exception when close,
+     * The follow plugins will not be close.
+     *
+     * @throws Exception when close failed.
+     */
+    default void close() throws Exception {

Review comment:
       I'd consider this is not acceptable for users or plugin developers.
   
   Plugin developers write a plugin and have a `close` method to clean the resources, they don't even know how the `close` method will be invoked, individually or as you said `We will have multiple plugin, if the former plugin throw exception when close,`, if I were a plugin developer, I assume the `close` method of my plugin should be ALWAYS invoked no matter how other plugins' `close` methods behave .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ruanwenjun commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832276124



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +73,24 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            pluginList.forEach(plugin -> {
+                try (Plugin<?> closed = plugin) {

Review comment:
       Yes, here is a bug 😭 , I have solved this and add UT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ruanwenjun commented on a change in pull request #1530: [Feature][core] Add plugin lifecycle of plugin.

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r831031966



##########
File path: seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
##########
@@ -52,19 +52,18 @@ public CheckResult checkConfig() {
         return CheckResult.success();
     }
 
-    @Override
-    public void prepare(FlinkEnvironment prepareEnv) {
-
-    }
-
     @Override
     public DataSet<Row> getData(FlinkEnvironment env) {
         Random random = new Random();
         return env.getBatchTableEnvironment().toDataSet(
-                env.getBatchTableEnvironment().fromValues(
-                        DataTypes.ROW(DataTypes.FIELD("name", DataTypes.STRING()),
-                                DataTypes.FIELD("age", DataTypes.INT())),
-                        Arrays.stream(NAME_ARRAY).map(n -> Row.of(n, random.nextInt(AGE_LIMIT)))
-                                .collect(Collectors.toList())), Row.class);
+            env.getBatchTableEnvironment().fromValues(
+                DataTypes.ROW(DataTypes.FIELD("name", DataTypes.STRING()),
+                    DataTypes.FIELD("age", DataTypes.INT())),
+                Arrays.stream(NAME_ARRAY).map(n -> Row.of(n, random.nextInt(AGE_LIMIT)))
+                    .collect(Collectors.toList())), Row.class);
+    }
+
+    public void close() throws Exception {
+        System.out.println("FakeSource close");

Review comment:
       Thanks for your review, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] kezhenxu94 commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832140268



##########
File path: seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
##########
@@ -25,9 +25,21 @@
 import java.io.Serializable;
 
 /**
- * a base interface indicates belonging to SeaTunnel.
+ * A base interface indicates belonging to SeaTunnel.
+ * Plugin will be used as follows:
+ * <pre>{@code
+ *      Plugin<?> plugin = new PluginA<>();
+ *      plugin.setConfig(Config);
+ *      CheckResult checkResult = plugin.checkConfig();
+ *      if (checkResult.getSuccess()) {
+ *         plugin.prepare();
+ *         // plugin execute code
+ *         plugin.close();

Review comment:
       What about
   
   ```suggestion
    *      try (Plugin<?> plugin = new PluginA<>()) { // Put the plugin in try-with-resource to ensure it's always closed.
    *        plugin.setConfig(Config);
    *        CheckResult checkResult = plugin.checkConfig();
    *        if (checkResult.getSuccess()) {
    *          plugin.prepare();
    *          // plugin execute code
    *        }
    *.     }
   ```

##########
File path: seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
##########
@@ -25,9 +25,21 @@
 import java.io.Serializable;
 
 /**
- * a base interface indicates belonging to SeaTunnel.
+ * A base interface indicates belonging to SeaTunnel.
+ * Plugin will be used as follows:
+ * <pre>{@code
+ *      Plugin<?> plugin = new PluginA<>();
+ *      plugin.setConfig(Config);
+ *      CheckResult checkResult = plugin.checkConfig();
+ *      if (checkResult.getSuccess()) {
+ *         plugin.prepare();
+ *         // plugin execute code
+ *         plugin.close();

Review comment:
       What about
   
   ```suggestion
    *      try (Plugin<?> plugin = new PluginA<>()) { // Put the plugin in try-with-resource block to ensure it's always closed.
    *        plugin.setConfig(Config);
    *        CheckResult checkResult = plugin.checkConfig();
    *        if (checkResult.getSuccess()) {
    *          plugin.prepare();
    *          // plugin execute code
    *        }
    *.     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] Rianico commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
Rianico commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832362310



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +72,30 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        Optional<RuntimeException> exceptionHolder = Optional.empty();
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            for (Plugin<E> plugin : pluginList) {
+                try (Plugin<?> closed = plugin) {
+                    // ignore
+                } catch (Exception e) {
+                    exceptionHolder = Optional.of(new RuntimeException(String.format(
+                            "plugin %s close error", plugin.getClass().getName()), exceptionHolder.orElse(null)));

Review comment:
       If mutiple plugins close failedly, the subsequent  exception will cover the previous exception and we will lose the previouse exception info.
   Maybe we can save all exception firstly (e.g. plugin -> exception) , continue executing next plugin's `close()` function, and throw the exception or return error message to notify the user which plugin close failedly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] kezhenxu94 commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832195237



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +73,24 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            pluginList.forEach(plugin -> {
+                try (Plugin<?> closed = plugin) {

Review comment:
       Oh yes, @ruanwenjun  you still throw the exception once there is a plugin throws an exception, causing the remaining plugins not to closed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] yx91490 commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
yx91490 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832189800



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +73,24 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            pluginList.forEach(plugin -> {
+                try (Plugin<?> closed = plugin) {

Review comment:
       can you add some failure case UT? it seems these codes also can not achive the 'close all plugins' goal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ruanwenjun commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832776186



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +72,30 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        Optional<RuntimeException> exceptionHolder = Optional.empty();
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            for (Plugin<E> plugin : pluginList) {
+                try (Plugin<?> closed = plugin) {
+                    // ignore
+                } catch (Exception e) {
+                    exceptionHolder = Optional.of(new RuntimeException(String.format(
+                            "plugin %s close error", plugin.getClass().getName()), exceptionHolder.orElse(null)));

Review comment:
       @Rianico Yes, has modified as you said, and all the plugin error message will be stored now. Please take a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ruanwenjun commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832106407



##########
File path: seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
##########
@@ -39,7 +51,24 @@ default CheckResult checkConfig() {
         return CheckResult.success();
     }
 
-    default void prepare(T prepareEnv) {
+    /**
+     * This is a lifecycle method, this method will be executed after Plugin created.
+     *
+     * @param env environment
+     */
+    default void prepare(T env) {
+
+    }
+
+    /**
+     * This is a lifecycle method, this method will be executed before Plugin destroy.
+     * It's used to release some resource. We will have multiple plugin, if the former plugin throw exception when close,
+     * The follow plugins will not be close.
+     *
+     * @throws Exception when close failed.
+     */
+    default void close() throws Exception {

Review comment:
       Thanks for your suggestion, this make sense to me. I have changed the code, now the close method will always be invoked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] yx91490 commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
yx91490 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r831789110



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +73,24 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            pluginList.forEach(plugin -> {
+                try {
+                    plugin.close();

Review comment:
       will close() be implemented to do resource clean? are you assume that if one plugin throw exception when call close(), the followed plugins no need to call close()?
   maybe we should add some comment to stress what it shouldn't do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] BenJFan commented on a change in pull request #1530: [Feature][core] Add plugin lifecycle of plugin.

Posted by GitBox <gi...@apache.org>.
BenJFan commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r830966644



##########
File path: seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
##########
@@ -52,19 +52,18 @@ public CheckResult checkConfig() {
         return CheckResult.success();
     }
 
-    @Override
-    public void prepare(FlinkEnvironment prepareEnv) {
-
-    }
-
     @Override
     public DataSet<Row> getData(FlinkEnvironment env) {
         Random random = new Random();
         return env.getBatchTableEnvironment().toDataSet(
-                env.getBatchTableEnvironment().fromValues(
-                        DataTypes.ROW(DataTypes.FIELD("name", DataTypes.STRING()),
-                                DataTypes.FIELD("age", DataTypes.INT())),
-                        Arrays.stream(NAME_ARRAY).map(n -> Row.of(n, random.nextInt(AGE_LIMIT)))
-                                .collect(Collectors.toList())), Row.class);
+            env.getBatchTableEnvironment().fromValues(
+                DataTypes.ROW(DataTypes.FIELD("name", DataTypes.STRING()),
+                    DataTypes.FIELD("age", DataTypes.INT())),
+                Arrays.stream(NAME_ARRAY).map(n -> Row.of(n, random.nextInt(AGE_LIMIT)))
+                    .collect(Collectors.toList())), Row.class);
+    }
+
+    public void close() throws Exception {
+        System.out.println("FakeSource close");

Review comment:
       Please use Log repalce SystemOut




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ruanwenjun commented on a change in pull request #1530: [Feature][core] Add plugin lifecycle of plugin.

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r831703019



##########
File path: seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/LifeCycle.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin;
+
+public interface LifeCycle<T> {
+
+    /**
+     * Open method.
+     *
+     * @param env envType
+     */
+    void open(T env);
+
+    /**
+     * Close method.
+     */
+    void close() throws Exception;

Review comment:
       OK, let's put him in Plugin first, I just worry that we may add more method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] CalvinKirs merged pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
CalvinKirs merged pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ruanwenjun commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r831800695



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +73,24 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            pluginList.forEach(plugin -> {
+                try {
+                    plugin.close();

Review comment:
       @yx91490 Yes, close is mainly used to do resource clean, this is a good catch, I have added some comment in `close` method.
   We have another choose to catch the exception and make it safe to close for each plugin, but I think throw it directly is OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] yx91490 commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
yx91490 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832189800



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +73,24 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            pluginList.forEach(plugin -> {
+                try (Plugin<?> closed = plugin) {

Review comment:
       can you add a failure case UT? it seems these code can not achive the 'close all' goal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ruanwenjun commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832825904



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +74,30 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        List<RuntimeException> exceptionHolder = new ArrayList<>();
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            for (Plugin<E> plugin : pluginList) {
+                try (Plugin<?> closed = plugin) {
+                    // ignore
+                } catch (Exception e) {
+                    exceptionHolder.add(new RuntimeException(String.format("plugin %s closed error, errorMessage: %s",

Review comment:
       @yx91490 Thanks for your review, agree with you, use suppressed exception is great. And I think use `RuntimeException` here is accepted, since this method will not execute in high frequency.
   
   Please take a look if there is still anything need to modify.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ruanwenjun commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832824951



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +74,30 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        List<RuntimeException> exceptionHolder = new ArrayList<>();
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            for (Plugin<E> plugin : pluginList) {
+                try (Plugin<?> closed = plugin) {
+                    // ignore
+                } catch (Exception e) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] yx91490 commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
yx91490 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832189800



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +73,24 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            pluginList.forEach(plugin -> {
+                try (Plugin<?> closed = plugin) {

Review comment:
       can you add a failure case UT? it seems these codes also can not achive the 'close all' goal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] yx91490 commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
yx91490 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r833208137



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +74,30 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        List<RuntimeException> exceptionHolder = new ArrayList<>();
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            for (Plugin<E> plugin : pluginList) {
+                try (Plugin<?> closed = plugin) {
+                    // ignore
+                } catch (Exception e) {
+                    exceptionHolder.add(new RuntimeException(String.format("plugin %s closed error, errorMessage: %s",

Review comment:
       LGTM




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] yx91490 commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
yx91490 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832787349



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +74,30 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        List<RuntimeException> exceptionHolder = new ArrayList<>();
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            for (Plugin<E> plugin : pluginList) {
+                try (Plugin<?> closed = plugin) {
+                    // ignore
+                } catch (Exception e) {

Review comment:
       I'd suggest catch Throwable.

##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +74,30 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        List<RuntimeException> exceptionHolder = new ArrayList<>();
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            for (Plugin<E> plugin : pluginList) {
+                try (Plugin<?> closed = plugin) {
+                    // ignore
+                } catch (Exception e) {
+                    exceptionHolder.add(new RuntimeException(String.format("plugin %s closed error, errorMessage: %s",

Review comment:
       If you only collect message finally, you don't need to new RuntimeException because it's too expensive.
   but I suggest use Suppressed exception feature to keep stacktrace.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] yx91490 commented on a change in pull request #1530: [Feature][core] Add close method in plugin.

Posted by GitBox <gi...@apache.org>.
yx91490 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r832189800



##########
File path: seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
##########
@@ -73,6 +73,24 @@ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
         }
     }
 
+    /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            pluginList.forEach(plugin -> {
+                try (Plugin<?> closed = plugin) {

Review comment:
       can you add some failure case UT? it seems these codes also can not achive the 'close all' goal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] yx91490 commented on a change in pull request #1530: [Feature][core] Add plugin lifecycle of plugin.

Posted by GitBox <gi...@apache.org>.
yx91490 commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r831187569



##########
File path: seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/LifeCycle.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin;
+
+public interface LifeCycle<T> {
+
+    /**
+     * Open method.
+     *
+     * @param env envType
+     */
+    void open(T env);
+
+    /**
+     * Close method.
+     */
+    void close() throws Exception;

Review comment:
       If LifeCycle was not referenced by other framework, seems no need to be a individual interface, may can put close() to Plugin directly.

##########
File path: seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
##########
@@ -39,7 +39,12 @@ default CheckResult checkConfig() {
         return CheckResult.success();
     }
 
-    default void prepare(T prepareEnv) {

Review comment:
       I suggest not to change the `prepare` function name for compatibility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ruanwenjun commented on a change in pull request #1530: [Feature][core] Add plugin lifecycle of plugin.

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on a change in pull request #1530:
URL: https://github.com/apache/incubator-seatunnel/pull/1530#discussion_r831743806



##########
File path: seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
##########
@@ -39,7 +39,12 @@ default CheckResult checkConfig() {
         return CheckResult.success();
     }
 
-    default void prepare(T prepareEnv) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org