You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/03/24 04:33:26 UTC
[incubator-seatunnel] branch dev updated: [Feature][core] Add close method in plugin. (#1530)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 379791d [Feature][core] Add close method in plugin. (#1530)
379791d is described below
commit 379791dd441cefced65315226b769d49a562fb82
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Mar 24 12:33:21 2022 +0800
[Feature][core] Add close method in plugin. (#1530)
* Add lifecycle
* fix checkstyle
* Remove lifecycle
* Fix java doc
* Add comment
* Plugin extends with AutoCloseable
* Add UT
* Format error message
* Use suppressed exception
---
.../java/org/apache/seatunnel/plugin/Plugin.java | 34 ++++++++-
.../apache/seatunnel/flink/source/FakeSource.java | 16 ++---
.../org/apache/seatunnel/flink/sink/FileSink.java | 7 ++
.../seatunnel/command/BaseTaskExecuteCommand.java | 30 +++++++-
.../command/flink/FlinkTaskExecuteCommand.java | 3 +-
.../command/spark/SparkTaskExecuteCommand.java | 3 +-
.../command/BaseTaskExecuteCommandTest.java | 80 ++++++++++++++++++++++
7 files changed, 157 insertions(+), 16 deletions(-)
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
index 32e65e7..488c4d4 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
@@ -25,9 +25,21 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.io.Serializable;
/**
- * a base interface indicates belonging to SeaTunnel.
+ * A base interface indicates belonging to SeaTunnel.
+ * Plugin will be used as follows:
+ * <pre>{@code
+ * Plugin<?> plugin = new PluginA<>();
+ * plugin.setConfig(Config);
+ * CheckResult checkResult = plugin.checkConfig();
+ * if (checkResult.getSuccess()) {
+ * plugin.prepare();
+ * // plugin execute code
+ * plugin.close();
+ * }
+ *
+ * }</pre>
*/
-public interface Plugin<T extends RuntimeEnv> extends Serializable {
+public interface Plugin<T extends RuntimeEnv> extends Serializable, AutoCloseable {
String RESULT_TABLE_NAME = "result_table_name";
String SOURCE_TABLE_NAME = "source_table_name";
@@ -39,7 +51,23 @@ public interface Plugin<T extends RuntimeEnv> extends Serializable {
return CheckResult.success();
}
- default void prepare(T prepareEnv) {
+ /**
+ * This is a lifecycle method, this method will be executed after Plugin created.
+ *
+ * @param env environment
+ */
+ default void prepare(T env) {
+
+ }
+
+ /**
+ * This is a lifecycle method, this method will be executed before Plugin destroy.
+ * It's used to release some resource.
+ *
+ * @throws Exception when close failed.
+ */
+ default void close() throws Exception {
+
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
index 54e48fd..6a7ab86 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
@@ -53,18 +53,14 @@ public class FakeSource implements FlinkBatchSource {
}
@Override
- public void prepare(FlinkEnvironment prepareEnv) {
-
- }
-
- @Override
public DataSet<Row> getData(FlinkEnvironment env) {
Random random = new Random();
return env.getBatchTableEnvironment().toDataSet(
- env.getBatchTableEnvironment().fromValues(
- DataTypes.ROW(DataTypes.FIELD("name", DataTypes.STRING()),
- DataTypes.FIELD("age", DataTypes.INT())),
- Arrays.stream(NAME_ARRAY).map(n -> Row.of(n, random.nextInt(AGE_LIMIT)))
- .collect(Collectors.toList())), Row.class);
+ env.getBatchTableEnvironment().fromValues(
+ DataTypes.ROW(DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("age", DataTypes.INT())),
+ Arrays.stream(NAME_ARRAY).map(n -> Row.of(n, random.nextInt(AGE_LIMIT)))
+ .collect(Collectors.toList())), Row.class);
}
+
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index 9e730d0..55dd971 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -150,4 +150,11 @@ public class FileSink implements FlinkStreamSink, FlinkBatchSink {
String path = StringTemplate.substitute(config.getString(PATH), format);
filePath = new Path(path);
}
+
+ @Override
+ public void close() throws Exception {
+ if (outputFormat != null) {
+ outputFormat.close();
+ }
+ }
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
index c9bf805..ba46c2c 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -61,7 +61,7 @@ public abstract class BaseTaskExecuteCommand<T extends CommandArgs, E extends Ru
}
/**
- * Execute prepare method defined in {@link Plugin}.
+ * Execute prepare method defined in {@link org.apache.seatunnel.plugin.Plugin}.
*
* @param env runtimeEnv
* @param plugins plugin list
@@ -74,6 +74,34 @@ public abstract class BaseTaskExecuteCommand<T extends CommandArgs, E extends Ru
}
/**
+ * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+ *
+ * @param plugins plugin list
+ */
+ @SafeVarargs
+ protected final void close(List<? extends Plugin<E>>... plugins) {
+ RuntimeException exceptionHolder = null;
+ for (List<? extends Plugin<E>> pluginList : plugins) {
+ for (Plugin<E> plugin : pluginList) {
+ try (Plugin<?> closed = plugin) {
+ // ignore
+ } catch (Throwable e) {
+ RuntimeException wrapperException = new RuntimeException(
+ String.format("plugin %s closed error", plugin.getClass()), e);
+ if (exceptionHolder == null) {
+ exceptionHolder = wrapperException;
+ } else {
+ exceptionHolder.addSuppressed(wrapperException);
+ }
+ }
+ }
+ }
+ if (exceptionHolder != null) {
+ throw exceptionHolder;
+ }
+ }
+
+ /**
* Print the logo.
*/
protected void showAsciiLogo() {
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
index f3f7a06..a31c38d 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -46,11 +46,12 @@ public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommand
execution = configBuilder.createExecution();
baseCheckConfig(sources, transforms, sinks);
- prepare(configBuilder.getEnv(), sources, transforms, sinks);
showAsciiLogo();
try {
+ prepare(configBuilder.getEnv(), sources, transforms, sinks);
execution.start(sources, transforms, sinks);
+ close(sources, transforms, sinks);
} catch (Exception e) {
throw new RuntimeException("Execute Flink task error", e);
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
index 912023f..0064382 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
@@ -43,11 +43,12 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
Execution<BaseSource<SparkEnvironment>, BaseTransform<SparkEnvironment>, BaseSink<SparkEnvironment>, SparkEnvironment>
execution = configBuilder.createExecution();
baseCheckConfig(sources, transforms, sinks);
- prepare(configBuilder.getEnv(), sources, transforms, sinks);
showAsciiLogo();
try {
+ prepare(configBuilder.getEnv(), sources, transforms, sinks);
execution.start(sources, transforms, sinks);
+ close(sources, transforms, sinks);
} catch (Exception e) {
throw new RuntimeException("Execute Spark task error", e);
}
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
new file mode 100644
index 0000000..3ddf8d3
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.Plugin;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BaseTaskExecuteCommandTest {
+
+ private static int CLOSE_TIMES = 0;
+
+ @Test
+ public void testClose() {
+ List<MockPlugin> pluginListA = new ArrayList<>();
+ pluginListA.add(new MockPlugin());
+ pluginListA.add(new MockPlugin());
+ List<MockPlugin> pluginListB = new ArrayList<>();
+ pluginListB.add(new MockPlugin());
+ pluginListB.add(new MockPlugin());
+ MockTaskExecutorCommand mockTaskExecutorCommand = new MockTaskExecutorCommand();
+ try {
+ mockTaskExecutorCommand.close(pluginListA, pluginListB);
+ } catch (Exception ex) {
+ // just print into console
+ ex.printStackTrace();
+ }
+ Assert.assertEquals(Integer.parseInt("4"), CLOSE_TIMES);
+ Assert.assertThrows(RuntimeException.class, () -> mockTaskExecutorCommand.close(pluginListA));
+
+ }
+
+ private static class MockPlugin implements Plugin<FlinkEnvironment> {
+
+ @Override
+ public void setConfig(Config config) {
+ }
+
+ @Override
+ public Config getConfig() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ CLOSE_TIMES++;
+ throw new RuntimeException("Test close with exception, closeTimes:" + CLOSE_TIMES);
+ }
+ }
+
+ private static class MockTaskExecutorCommand extends BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
+
+ @Override
+ public void execute(FlinkCommandArgs commandArgs) {
+
+ }
+ }
+}