You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/17 15:51:40 UTC

[incubator-seatunnel] branch dev updated: [Feature][Zeta] Add Zeta Client ShutdownHook To Cancel Job (#3946)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d7fc6f77c [Feature][Zeta] Add Zeta Client ShutdownHook To Cancel Job (#3946)
d7fc6f77c is described below

commit d7fc6f77cef9b1ac88fd2af77c399709e932fcbc
Author: Guangdong Liu <80...@qq.com>
AuthorDate: Tue Jan 17 23:51:34 2023 +0800

    [Feature][Zeta] Add Zeta Client ShutdownHook To Cancel Job (#3946)
    
    * feature:Add Zeta Client ShutdownHook To Cancel Job
---
 release-note.md                                    |  1 +
 .../core/starter/command/AbstractCommandArgs.java  | 36 ++----------
 .../core/starter/command/CommandArgs.java          | 22 ++------
 .../core/starter/utils/FileUtilsTest.java          | 21 ++-----
 .../core/starter/flink/args/FlinkCommandArgs.java  | 50 +++++++---------
 .../core/starter/spark/args/SparkCommandArgs.java  | 24 ++------
 .../starter/seatunnel/args/ClientCommandArgs.java  | 66 +++-------------------
 .../starter/seatunnel/args/ServerCommandArgs.java  | 12 ++--
 .../seatunnel/command/ClientExecuteCommand.java    | 55 ++++++++++++------
 .../seatunnel/engine/client/SeaTunnelClient.java   |  4 +-
 10 files changed, 89 insertions(+), 202 deletions(-)

diff --git a/release-note.md b/release-note.md
index 29e11c066..07f4910a0 100644
--- a/release-note.md
+++ b/release-note.md
@@ -3,6 +3,7 @@
 ## New Feature
 ### Zeta Engine
 - [Script]Add support close engine instance shell
+- [Client]Add Zeta Client ShutdownHook To Cancel Job
 ### Core
 - [Starter][Flink]Support transform-v2 for flink #3396
 ### Transformer
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
index 5b7489a0a..4bc068bc1 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.DeployMode;
 
 import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 
 import java.util.Collections;
 import java.util.List;
@@ -28,6 +30,8 @@ import java.util.List;
 /**
  * Abstract class of {@link CommandArgs} implementation to save common configuration settings
  */
+@EqualsAndHashCode(callSuper = true)
+@Data
 public abstract class AbstractCommandArgs extends CommandArgs {
 
     /**
@@ -58,37 +62,5 @@ public abstract class AbstractCommandArgs extends CommandArgs {
             description = "SeaTunnel job name")
     protected String jobName = Constants.LOGO;
 
-    public String getConfigFile() {
-        return configFile;
-    }
-
-    public void setConfigFile(String configFile) {
-        this.configFile = configFile;
-    }
-
-    public List<String> getVariables() {
-        return variables;
-    }
-
-    public void setVariables(List<String> variables) {
-        this.variables = variables;
-    }
-
-    public boolean isCheckConfig() {
-        return checkConfig;
-    }
-
-    public void setCheckConfig(boolean checkConfig) {
-        this.checkConfig = checkConfig;
-    }
-
-    public String getJobName() {
-        return jobName;
-    }
-
-    public void setJobName(String jobName) {
-        this.jobName = jobName;
-    }
-
     public abstract DeployMode getDeployMode();
 }
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/CommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/CommandArgs.java
index 5a1a3e344..9a2256553 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/CommandArgs.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/CommandArgs.java
@@ -18,20 +18,22 @@
 package org.apache.seatunnel.core.starter.command;
 
 import com.beust.jcommander.Parameter;
+import lombok.Data;
 
 import java.util.List;
 
 /**
  * CommandArgs, used to create command {@link Command}
  */
+@Data
 public abstract class CommandArgs {
 
     /**
      * Help parameter
      */
     @Parameter(names = {"-h", "--help"},
-            help = true,
-            description = "Show the usage message")
+        help = true,
+        description = "Show the usage message")
     protected boolean help = false;
 
     /**
@@ -39,21 +41,5 @@ public abstract class CommandArgs {
      */
     protected List<String> originalParameters;
 
-    public boolean isHelp() {
-        return help;
-    }
-
-    public void setHelp(boolean help) {
-        this.help = help;
-    }
-
-    public List<String> getOriginalParameters() {
-        return originalParameters;
-    }
-
-    public void setOriginalParameters(List<String> originalParameters) {
-        this.originalParameters = originalParameters;
-    }
-
     public abstract Command<?> buildCommand();
 }
diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java
index adffe3687..c69c7a128 100644
--- a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
 import org.apache.seatunnel.core.starter.command.Command;
 
 import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -45,6 +47,8 @@ public class FileUtilsTest {
         Assertions.assertEquals("flink.batch.conf", FileUtils.getConfigPath(sparkCommandArgs).toString());
     }
 
+    @EqualsAndHashCode(callSuper = true)
+    @Data
     private static class SparkCommandArgs extends AbstractCommandArgs {
 
         @Parameter(names = {"-c", "--config"},
@@ -59,22 +63,5 @@ public class FileUtilsTest {
             return null;
         }
 
-        public void setDeployMode(DeployMode deployMode) {
-            this.deployMode = deployMode;
-        }
-
-        public DeployMode getDeployMode() {
-            return deployMode;
-        }
-
-        @Override
-        public String getConfigFile() {
-            return this.configFile;
-        }
-
-        @Override
-        public void setConfigFile(String configFile) {
-            this.configFile = configFile;
-        }
     }
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
index 5440aa9eb..4562d3e03 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
@@ -27,21 +27,25 @@ import org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand;
 
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 
 import java.util.ArrayList;
 import java.util.List;
 
+@EqualsAndHashCode(callSuper = true)
+@Data
 public class FlinkCommandArgs extends AbstractCommandArgs {
 
     @Parameter(names = {"-e", "--deploy-mode"},
-            converter = FlinkDeployModeConverter.class,
-            description = "Flink job deploy mode, support [run, run-application]")
+        converter = FlinkDeployModeConverter.class,
+        description = "Flink job deploy mode, support [run, run-application]")
     private DeployMode deployMode = DeployMode.RUN;
 
     @Parameter(names = {"--master", "--target"},
-            converter = FlinkMasterTargetConverter.class,
-            description = "Flink job submitted target master, support [local, remote, yarn-session, yarn-per-job, " +
-                    "kubernetes-session, yarn-application, kubernetes-application]")
+        converter = FlinkMasterTargetConverter.class,
+        description = "Flink job submitted target master, support [local, remote, yarn-session, yarn-per-job, " +
+            "kubernetes-session, yarn-application, kubernetes-application]")
     private MasterType masterType;
 
     @Override
@@ -54,32 +58,16 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         }
     }
 
-    public DeployMode getDeployMode() {
-        return deployMode;
-    }
-
-    public void setDeployMode(DeployMode deployMode) {
-        this.deployMode = deployMode;
-    }
-
-    public MasterType getMasterType() {
-        return masterType;
-    }
-
-    public void setMasterType(MasterType masterType) {
-        this.masterType = masterType;
-    }
-
     @Override
     public String toString() {
         return "FlinkCommandArgs{" +
-                "deployMode=" + deployMode +
-                ", masterType=" + masterType +
-                ", configFile='" + configFile + '\'' +
-                ", variables=" + variables +
-                ", jobName='" + jobName + '\'' +
-                ", originalParameters=" + originalParameters +
-                '}';
+            "deployMode=" + deployMode +
+            ", masterType=" + masterType +
+            ", configFile='" + configFile + '\'' +
+            ", variables=" + variables +
+            ", jobName='" + jobName + '\'' +
+            ", originalParameters=" + originalParameters +
+            '}';
     }
 
     public static class FlinkMasterTargetConverter implements IStringConverter<MasterType> {
@@ -102,8 +90,8 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
                 return masterType;
             } else {
                 throw new IllegalArgumentException("SeaTunnel job on flink engine submitted target only " +
-                        "support these options: [local, remote, yarn-session, yarn-per-job, kubernetes-session, " +
-                        "yarn-application, kubernetes-application]");
+                    "support these options: [local, remote, yarn-session, yarn-per-job, kubernetes-session, " +
+                    "yarn-application, kubernetes-application]");
             }
         }
     }
@@ -123,7 +111,7 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
                 return deployMode;
             } else {
                 throw new IllegalArgumentException("SeaTunnel job on flink engine deploy mode only " +
-                        "support these options: [run, run-application]");
+                    "support these options: [run, run-application]");
             }
         }
     }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
index cd85358ee..f43781838 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
@@ -26,10 +26,14 @@ import org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand;
 
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 
 import java.util.ArrayList;
 import java.util.List;
 
+@EqualsAndHashCode(callSuper = true)
+@Data
 public class SparkCommandArgs extends AbstractCommandArgs {
 
     @Parameter(names = {"-e", "--deploy-mode"},
@@ -39,7 +43,7 @@ public class SparkCommandArgs extends AbstractCommandArgs {
 
     @Parameter(names = {"-m", "--master"},
         description = "Spark master, support [spark://host:port, mesos://host:port, yarn, " +
-                "k8s://https://host:port, local], default local[*]")
+            "k8s://https://host:port, local], default local[*]")
     private String master = "local[*]";
 
     @Override
@@ -52,22 +56,6 @@ public class SparkCommandArgs extends AbstractCommandArgs {
         }
     }
 
-    public String getMaster() {
-        return master;
-    }
-
-    public DeployMode getDeployMode() {
-        return deployMode;
-    }
-
-    public void setDeployMode(DeployMode deployMode) {
-        this.deployMode = deployMode;
-    }
-
-    public void setMaster(String master) {
-        this.master = master;
-    }
-
     public static class SparkDeployModeConverter implements IStringConverter<DeployMode> {
         private static final List<DeployMode> DEPLOY_MODE_TYPE_LIST = new ArrayList<>();
 
@@ -83,7 +71,7 @@ public class SparkCommandArgs extends AbstractCommandArgs {
                 return deployMode;
             } else {
                 throw new IllegalArgumentException("SeaTunnel job on spark engine deploy mode only " +
-                        "support these options: [cluster, client]");
+                    "support these options: [cluster, client]");
             }
         }
     }
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 82046db0e..b5ef604b6 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -27,10 +27,14 @@ import org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelConfValidate
 
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 
 import java.util.ArrayList;
 import java.util.List;
 
+@EqualsAndHashCode(callSuper = true)
+@Data
 public class ClientCommandArgs extends AbstractCommandArgs {
     @Parameter(names = {"-m", "--master"},
         description = "SeaTunnel job submit master, support [local, cluster]",
@@ -65,6 +69,10 @@ public class ClientCommandArgs extends AbstractCommandArgs {
         description = "list job status")
     private boolean listJob = false;
 
+    @Parameter(names = {"-cj", "--close-job"},
+        description = "Close client the task will also be closed")
+    private boolean closeJob = true;
+
     @Override
     public Command<?> buildCommand() {
         Common.setDeployMode(getDeployMode());
@@ -75,62 +83,6 @@ public class ClientCommandArgs extends AbstractCommandArgs {
         }
     }
 
-    public MasterType getMasterType() {
-        return masterType;
-    }
-
-    public void setMasterType(MasterType masterType) {
-        this.masterType = masterType;
-    }
-
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
-    public String getJobId() {
-        return jobId;
-    }
-
-    public void setJobId(String jobId) {
-        this.jobId = jobId;
-    }
-
-    public String getCancelJobId() {
-        return cancelJobId;
-    }
-
-    public void setCancelJobId(String cancelJobId) {
-        this.cancelJobId = cancelJobId;
-    }
-
-    public String getMetricsJobId() {
-        return metricsJobId;
-    }
-
-    public String getRestoreJobId(){
-        return restoreJobId;
-    }
-
-    public String getSavePointJobId(){
-        return savePointJobId;
-    }
-
-    public void setMetricsJobId(String metricsJobId) {
-        this.metricsJobId = metricsJobId;
-    }
-
-    public boolean isListJob() {
-        return listJob;
-    }
-
-    public void setListJob(boolean listJob) {
-        this.listJob = listJob;
-    }
-
     public DeployMode getDeployMode() {
         return DeployMode.CLIENT;
     }
@@ -150,7 +102,7 @@ public class ClientCommandArgs extends AbstractCommandArgs {
                 return masterType;
             } else {
                 throw new IllegalArgumentException("SeaTunnel job on st-engine submitted target only " +
-                        "support these options: [local, cluster]");
+                    "support these options: [local, cluster]");
             }
         }
     }
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
index 2a1a92d2f..d91e18690 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
@@ -22,7 +22,11 @@ import org.apache.seatunnel.core.starter.command.CommandArgs;
 import org.apache.seatunnel.core.starter.seatunnel.command.ServerExecuteCommand;
 
 import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 
+@EqualsAndHashCode(callSuper = true)
+@Data
 public class ServerCommandArgs extends CommandArgs {
     @Parameter(names = {"-cn", "--cluster"},
         description = "The name of cluster")
@@ -32,12 +36,4 @@ public class ServerCommandArgs extends CommandArgs {
     public Command<?> buildCommand() {
         return new ServerExecuteCommand(this);
     }
-
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
 }
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index ede92ef3c..e8ad21092 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
 
 import com.hazelcast.client.config.ClientConfig;
@@ -57,6 +58,8 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
 
     private final ClientCommandArgs clientCommandArgs;
 
+    private JobStatus jobStatus;
+
     public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) {
         this.clientCommandArgs = clientCommandArgs;
     }
@@ -92,10 +95,9 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
             } else if (null != clientCommandArgs.getMetricsJobId()) {
                 String jobMetrics = engineClient.getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId()));
                 System.out.println(jobMetrics);
-            } else if (null != clientCommandArgs.getSavePointJobId()){
+            } else if (null != clientCommandArgs.getSavePointJobId()) {
                 engineClient.savePointJob(Long.parseLong(clientCommandArgs.getSavePointJobId()));
-            }
-            else {
+            } else {
                 Path configFile = FileUtils.getConfigPath(clientCommandArgs);
                 checkConfigExist(configFile);
                 JobConfig jobConfig = new JobConfig();
@@ -112,14 +114,24 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
                 startTime = LocalDateTime.now();
                 // create job proxy
                 ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+                // register cancelJob hook
+                if (clientCommandArgs.isCloseJob()) {
+                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                        try {
+                            shutdownHook(clientJobProxy);
+                        } catch (Exception e) {
+                            log.error("Cancel job failed.", e);
+                        }
+                    }));
+                }
                 // get job id
                 long jobId = clientJobProxy.getJobId();
                 JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId);
                 executorService = Executors.newSingleThreadScheduledExecutor();
                 executorService.scheduleAtFixedRate(jobMetricsRunner, 0,
-                        seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS);
+                    seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS);
                 // wait for job complete
-                clientJobProxy.waitForJobComplete();
+                jobStatus = clientJobProxy.waitForJobComplete();
                 // get job end time
                 endTime = LocalDateTime.now();
                 // get job statistic information when job finished
@@ -140,24 +152,24 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
             if (jobMetricsSummary != null) {
                 // print job statistics information when job finished
                 log.info(StringFormatUtils.formatTable(
-                        "Job Statistic Information",
-                        "Start Time",
-                        DateTimeUtils.toString(startTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+                    "Job Statistic Information",
+                    "Start Time",
+                    DateTimeUtils.toString(startTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
 
-                        "End Time",
-                        DateTimeUtils.toString(endTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+                    "End Time",
+                    DateTimeUtils.toString(endTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
 
-                        "Total Time(s)",
-                        Duration.between(startTime, endTime).getSeconds(),
+                    "Total Time(s)",
+                    Duration.between(startTime, endTime).getSeconds(),
 
-                        "Total Read Count",
-                        jobMetricsSummary.getSourceReadCount(),
+                    "Total Read Count",
+                    jobMetricsSummary.getSourceReadCount(),
 
-                        "Total Write Count",
-                        jobMetricsSummary.getSinkWriteCount(),
+                    "Total Write Count",
+                    jobMetricsSummary.getSinkWriteCount(),
 
-                        "Total Failed Count",
-                        jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount()));
+                    "Total Failed Count",
+                    jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount()));
             }
         }
     }
@@ -176,4 +188,11 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
         return namePrefix + "-" + random.nextInt(1000000);
     }
 
+    private void shutdownHook(ClientJobProxy clientJobProxy) {
+        if (jobStatus == null || !jobStatus.isEndState()) {
+            log.warn("Task will be closed due to client shutdown.");
+            clientJobProxy.cancelJob();
+        }
+    }
+
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index c3be76ae7..8c1845549 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -82,9 +82,7 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
     }
 
     public void shutdown() {
-        if (hazelcastClient != null) {
-            hazelcastClient.shutdown();
-        }
+        hazelcastClient.shutdown();
     }
 
     /**