You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/13 13:35:02 UTC
[incubator-seatunnel] branch dev updated: close in finally block (#3053)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5ca46103f close in finally block (#3053)
5ca46103f is described below
commit 5ca46103f6bd3c2f856c4d6c949f667e26812691
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Thu Oct 13 21:34:56 2022 +0800
close in finally block (#3053)
---
.../seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java | 3 ++-
.../apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java | 3 ++-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
index ab9a35e82..c9ea07556 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
@@ -80,9 +80,10 @@ public class FlinkApiTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkComm
FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
prepare(executionContext.getEnvironment(), sources, transforms, sinks);
execution.start(sources, transforms, sinks);
- close(sources, transforms, sinks);
} catch (Exception e) {
throw new CommandExecuteException("Execute Flink task error", e);
+ } finally {
+ close(sources, transforms, sinks);
}
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
index e9c54b441..944b1a7bc 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
@@ -65,9 +65,10 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
BaseSink<SparkEnvironment>, SparkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
prepare(executionContext.getEnvironment(), sources, transforms, sinks);
execution.start(sources, transforms, sinks);
- close(sources, transforms, sinks);
} catch (Exception e) {
throw new CommandExecuteException("Execute Spark task error", e);
+ } finally {
+ close(sources, transforms, sinks);
}
}