You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/03/24 04:33:26 UTC

[incubator-seatunnel] branch dev updated: [Feature][core] Add close method in plugin. (#1530)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 379791d  [Feature][core] Add close method in plugin. (#1530)
379791d is described below

commit 379791dd441cefced65315226b769d49a562fb82
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Mar 24 12:33:21 2022 +0800

    [Feature][core] Add close method in plugin. (#1530)
    
    * Add lifecycle
    
    * fix checkstyle
    
    * Remove lifecycle
    
    * Fix java doc
    
    * Add comment
    
    * Plugin extends with AutoCloseable
    
    * Add UT
    
    * Format error message
    
    * Use suppressed exception
---
 .../java/org/apache/seatunnel/plugin/Plugin.java   | 34 ++++++++-
 .../apache/seatunnel/flink/source/FakeSource.java  | 16 ++---
 .../org/apache/seatunnel/flink/sink/FileSink.java  |  7 ++
 .../seatunnel/command/BaseTaskExecuteCommand.java  | 30 +++++++-
 .../command/flink/FlinkTaskExecuteCommand.java     |  3 +-
 .../command/spark/SparkTaskExecuteCommand.java     |  3 +-
 .../command/BaseTaskExecuteCommandTest.java        | 80 ++++++++++++++++++++++
 7 files changed, 157 insertions(+), 16 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
index 32e65e7..488c4d4 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
@@ -25,9 +25,21 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import java.io.Serializable;
 
 /**
- * a base interface indicates belonging to SeaTunnel.
+ * A base interface indicates belonging to SeaTunnel.
+ * Plugin will be used as follows:
+ * <pre>{@code
+ *      Plugin<?> plugin = new PluginA<>();
+ *      plugin.setConfig(Config);
+ *      CheckResult checkResult = plugin.checkConfig();
+ *      if (checkResult.getSuccess()) {
+ *         plugin.prepare();
+ *         // plugin execute code
+ *         plugin.close();
+ *      }
+ *
+ * }</pre>
  */
-public interface Plugin<T extends RuntimeEnv> extends Serializable {
+public interface Plugin<T extends RuntimeEnv> extends Serializable, AutoCloseable {
     String RESULT_TABLE_NAME = "result_table_name";
     String SOURCE_TABLE_NAME = "source_table_name";
 
@@ -39,7 +51,23 @@ public interface Plugin<T extends RuntimeEnv> extends Serializable {
         return CheckResult.success();
     }
 
-    default void prepare(T prepareEnv) {
+    /**
+     * This is a lifecycle method, this method will be executed after Plugin created.
+     *
+     * @param env environment
+     */
+    default void prepare(T env) {
+
+    }
+
+    /**
+     * This is a lifecycle method, this method will be executed before Plugin destroy.
+     * It's used to release some resource.
+     *
+     * @throws Exception when close failed.
+     */
+    default void close() throws Exception {
+
     }
 
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
index 54e48fd..6a7ab86 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
@@ -53,18 +53,14 @@ public class FakeSource implements FlinkBatchSource {
     }
 
     @Override
-    public void prepare(FlinkEnvironment prepareEnv) {
-
-    }
-
-    @Override
     public DataSet<Row> getData(FlinkEnvironment env) {
         Random random = new Random();
         return env.getBatchTableEnvironment().toDataSet(
-                env.getBatchTableEnvironment().fromValues(
-                        DataTypes.ROW(DataTypes.FIELD("name", DataTypes.STRING()),
-                                DataTypes.FIELD("age", DataTypes.INT())),
-                        Arrays.stream(NAME_ARRAY).map(n -> Row.of(n, random.nextInt(AGE_LIMIT)))
-                                .collect(Collectors.toList())), Row.class);
+            env.getBatchTableEnvironment().fromValues(
+                DataTypes.ROW(DataTypes.FIELD("name", DataTypes.STRING()),
+                    DataTypes.FIELD("age", DataTypes.INT())),
+                Arrays.stream(NAME_ARRAY).map(n -> Row.of(n, random.nextInt(AGE_LIMIT)))
+                    .collect(Collectors.toList())), Row.class);
     }
+
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index 9e730d0..55dd971 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -150,4 +150,11 @@ public class FileSink implements FlinkStreamSink, FlinkBatchSink {
         String path = StringTemplate.substitute(config.getString(PATH), format);
         filePath = new Path(path);
     }
+
+    @Override
+    public void close() throws Exception {
+        if (outputFormat != null) {
+            outputFormat.close();
+        }
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
index c9bf805..ba46c2c 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -61,7 +61,7 @@ public abstract class BaseTaskExecuteCommand<T extends CommandArgs, E extends Ru
     }
 
     /**
-     * Execute prepare method defined in {@link Plugin}.
+     * Execute prepare method defined in {@link org.apache.seatunnel.plugin.Plugin}.
      *
      * @param env     runtimeEnv
      * @param plugins plugin list
@@ -74,6 +74,34 @@ public abstract class BaseTaskExecuteCommand<T extends CommandArgs, E extends Ru
     }
 
     /**
+     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     *
+     * @param plugins plugin list
+     */
+    @SafeVarargs
+    protected final void close(List<? extends Plugin<E>>... plugins) {
+        RuntimeException exceptionHolder = null;
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            for (Plugin<E> plugin : pluginList) {
+                try (Plugin<?> closed = plugin) {
+                    // ignore
+                } catch (Throwable e) {
+                    RuntimeException wrapperException = new RuntimeException(
+                            String.format("plugin %s closed error", plugin.getClass()), e);
+                    if (exceptionHolder == null) {
+                        exceptionHolder = wrapperException;
+                    } else {
+                        exceptionHolder.addSuppressed(wrapperException);
+                    }
+                }
+            }
+        }
+        if (exceptionHolder != null) {
+            throw exceptionHolder;
+        }
+    }
+
+    /**
      * Print the logo.
      */
     protected void showAsciiLogo() {
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
index f3f7a06..a31c38d 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -46,11 +46,12 @@ public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommand
             execution = configBuilder.createExecution();
 
         baseCheckConfig(sources, transforms, sinks);
-        prepare(configBuilder.getEnv(), sources, transforms, sinks);
         showAsciiLogo();
 
         try {
+            prepare(configBuilder.getEnv(), sources, transforms, sinks);
             execution.start(sources, transforms, sinks);
+            close(sources, transforms, sinks);
         } catch (Exception e) {
             throw new RuntimeException("Execute Flink task error", e);
         }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
index 912023f..0064382 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
@@ -43,11 +43,12 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
         Execution<BaseSource<SparkEnvironment>, BaseTransform<SparkEnvironment>, BaseSink<SparkEnvironment>, SparkEnvironment>
             execution = configBuilder.createExecution();
         baseCheckConfig(sources, transforms, sinks);
-        prepare(configBuilder.getEnv(), sources, transforms, sinks);
         showAsciiLogo();
 
         try {
+            prepare(configBuilder.getEnv(), sources, transforms, sinks);
             execution.start(sources, transforms, sinks);
+            close(sources, transforms, sinks);
         } catch (Exception e) {
             throw new RuntimeException("Execute Spark task error", e);
         }
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
new file mode 100644
index 0000000..3ddf8d3
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.Plugin;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BaseTaskExecuteCommandTest {
+
+    private static int CLOSE_TIMES = 0;
+
+    @Test
+    public void testClose() {
+        List<MockPlugin> pluginListA = new ArrayList<>();
+        pluginListA.add(new MockPlugin());
+        pluginListA.add(new MockPlugin());
+        List<MockPlugin> pluginListB = new ArrayList<>();
+        pluginListB.add(new MockPlugin());
+        pluginListB.add(new MockPlugin());
+        MockTaskExecutorCommand mockTaskExecutorCommand = new MockTaskExecutorCommand();
+        try {
+            mockTaskExecutorCommand.close(pluginListA, pluginListB);
+        } catch (Exception ex) {
+            // just print into console
+            ex.printStackTrace();
+        }
+        Assert.assertEquals(Integer.parseInt("4"), CLOSE_TIMES);
+        Assert.assertThrows(RuntimeException.class, () -> mockTaskExecutorCommand.close(pluginListA));
+
+    }
+
+    private static class MockPlugin implements Plugin<FlinkEnvironment> {
+
+        @Override
+        public void setConfig(Config config) {
+        }
+
+        @Override
+        public Config getConfig() {
+            return null;
+        }
+
+        @Override
+        public void close() {
+            CLOSE_TIMES++;
+            throw new RuntimeException("Test close with exception, closeTimes:" + CLOSE_TIMES);
+        }
+    }
+
+    private static class MockTaskExecutorCommand extends BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
+
+        @Override
+        public void execute(FlinkCommandArgs commandArgs) {
+
+        }
+    }
+}