You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/13 13:35:02 UTC

[incubator-seatunnel] branch dev updated: close in finally block (#3053)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5ca46103f close in finally block (#3053)
5ca46103f is described below

commit 5ca46103f6bd3c2f856c4d6c949f667e26812691
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Thu Oct 13 21:34:56 2022 +0800

    close in finally block (#3053)
---
 .../seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java       | 3 ++-
 .../apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java   | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
index ab9a35e82..c9ea07556 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
@@ -80,9 +80,10 @@ public class FlinkApiTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkComm
                 FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
             prepare(executionContext.getEnvironment(), sources, transforms, sinks);
             execution.start(sources, transforms, sinks);
-            close(sources, transforms, sinks);
         } catch (Exception e) {
             throw new CommandExecuteException("Execute Flink task error", e);
+        } finally {
+            close(sources, transforms, sinks);
         }
     }
 
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
index e9c54b441..944b1a7bc 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
@@ -65,9 +65,10 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
             BaseSink<SparkEnvironment>, SparkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
             prepare(executionContext.getEnvironment(), sources, transforms, sinks);
             execution.start(sources, transforms, sinks);
-            close(sources, transforms, sinks);
         } catch (Exception e) {
             throw new CommandExecuteException("Execute Spark task error", e);
+        } finally {
+            close(sources, transforms, sinks);
         }
     }