You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/17 15:51:40 UTC
[incubator-seatunnel] branch dev updated: [Feature][Zeta] Add Zeta Client ShutdownHook To Cancel Job (#3946)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d7fc6f77c [Feature][Zeta] Add Zeta Client ShutdownHook To Cancel Job (#3946)
d7fc6f77c is described below
commit d7fc6f77cef9b1ac88fd2af77c399709e932fcbc
Author: Guangdong Liu <80...@qq.com>
AuthorDate: Tue Jan 17 23:51:34 2023 +0800
[Feature][Zeta] Add Zeta Client ShutdownHook To Cancel Job (#3946)
* feature:Add Zeta Client ShutdownHook To Cancel Job
---
release-note.md | 1 +
.../core/starter/command/AbstractCommandArgs.java | 36 ++----------
.../core/starter/command/CommandArgs.java | 22 ++------
.../core/starter/utils/FileUtilsTest.java | 21 ++-----
.../core/starter/flink/args/FlinkCommandArgs.java | 50 +++++++---------
.../core/starter/spark/args/SparkCommandArgs.java | 24 ++------
.../starter/seatunnel/args/ClientCommandArgs.java | 66 +++-------------------
.../starter/seatunnel/args/ServerCommandArgs.java | 12 ++--
.../seatunnel/command/ClientExecuteCommand.java | 55 ++++++++++++------
.../seatunnel/engine/client/SeaTunnelClient.java | 4 +-
10 files changed, 89 insertions(+), 202 deletions(-)
diff --git a/release-note.md b/release-note.md
index 29e11c066..07f4910a0 100644
--- a/release-note.md
+++ b/release-note.md
@@ -3,6 +3,7 @@
## New Feature
### Zeta Engine
- [Script]Add support close engine instance shell
+- [Client]Add Zeta Client ShutdownHook To Cancel Job
### Core
- [Starter][Flink]Support transform-v2 for flink #3396
### Transformer
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
index 5b7489a0a..4bc068bc1 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.DeployMode;
import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import java.util.Collections;
import java.util.List;
@@ -28,6 +30,8 @@ import java.util.List;
/**
* Abstract class of {@link CommandArgs} implementation to save common configuration settings
*/
+@EqualsAndHashCode(callSuper = true)
+@Data
public abstract class AbstractCommandArgs extends CommandArgs {
/**
@@ -58,37 +62,5 @@ public abstract class AbstractCommandArgs extends CommandArgs {
description = "SeaTunnel job name")
protected String jobName = Constants.LOGO;
- public String getConfigFile() {
- return configFile;
- }
-
- public void setConfigFile(String configFile) {
- this.configFile = configFile;
- }
-
- public List<String> getVariables() {
- return variables;
- }
-
- public void setVariables(List<String> variables) {
- this.variables = variables;
- }
-
- public boolean isCheckConfig() {
- return checkConfig;
- }
-
- public void setCheckConfig(boolean checkConfig) {
- this.checkConfig = checkConfig;
- }
-
- public String getJobName() {
- return jobName;
- }
-
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
public abstract DeployMode getDeployMode();
}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/CommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/CommandArgs.java
index 5a1a3e344..9a2256553 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/CommandArgs.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/CommandArgs.java
@@ -18,20 +18,22 @@
package org.apache.seatunnel.core.starter.command;
import com.beust.jcommander.Parameter;
+import lombok.Data;
import java.util.List;
/**
* CommandArgs, used to create command {@link Command}
*/
+@Data
public abstract class CommandArgs {
/**
* Help parameter
*/
@Parameter(names = {"-h", "--help"},
- help = true,
- description = "Show the usage message")
+ help = true,
+ description = "Show the usage message")
protected boolean help = false;
/**
@@ -39,21 +41,5 @@ public abstract class CommandArgs {
*/
protected List<String> originalParameters;
- public boolean isHelp() {
- return help;
- }
-
- public void setHelp(boolean help) {
- this.help = help;
- }
-
- public List<String> getOriginalParameters() {
- return originalParameters;
- }
-
- public void setOriginalParameters(List<String> originalParameters) {
- this.originalParameters = originalParameters;
- }
-
public abstract Command<?> buildCommand();
}
diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java
index adffe3687..c69c7a128 100644
--- a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
import org.apache.seatunnel.core.starter.command.Command;
import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -45,6 +47,8 @@ public class FileUtilsTest {
Assertions.assertEquals("flink.batch.conf", FileUtils.getConfigPath(sparkCommandArgs).toString());
}
+ @EqualsAndHashCode(callSuper = true)
+ @Data
private static class SparkCommandArgs extends AbstractCommandArgs {
@Parameter(names = {"-c", "--config"},
@@ -59,22 +63,5 @@ public class FileUtilsTest {
return null;
}
- public void setDeployMode(DeployMode deployMode) {
- this.deployMode = deployMode;
- }
-
- public DeployMode getDeployMode() {
- return deployMode;
- }
-
- @Override
- public String getConfigFile() {
- return this.configFile;
- }
-
- @Override
- public void setConfigFile(String configFile) {
- this.configFile = configFile;
- }
}
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
index 5440aa9eb..4562d3e03 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
@@ -27,21 +27,25 @@ import org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import java.util.ArrayList;
import java.util.List;
+@EqualsAndHashCode(callSuper = true)
+@Data
public class FlinkCommandArgs extends AbstractCommandArgs {
@Parameter(names = {"-e", "--deploy-mode"},
- converter = FlinkDeployModeConverter.class,
- description = "Flink job deploy mode, support [run, run-application]")
+ converter = FlinkDeployModeConverter.class,
+ description = "Flink job deploy mode, support [run, run-application]")
private DeployMode deployMode = DeployMode.RUN;
@Parameter(names = {"--master", "--target"},
- converter = FlinkMasterTargetConverter.class,
- description = "Flink job submitted target master, support [local, remote, yarn-session, yarn-per-job, " +
- "kubernetes-session, yarn-application, kubernetes-application]")
+ converter = FlinkMasterTargetConverter.class,
+ description = "Flink job submitted target master, support [local, remote, yarn-session, yarn-per-job, " +
+ "kubernetes-session, yarn-application, kubernetes-application]")
private MasterType masterType;
@Override
@@ -54,32 +58,16 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
}
}
- public DeployMode getDeployMode() {
- return deployMode;
- }
-
- public void setDeployMode(DeployMode deployMode) {
- this.deployMode = deployMode;
- }
-
- public MasterType getMasterType() {
- return masterType;
- }
-
- public void setMasterType(MasterType masterType) {
- this.masterType = masterType;
- }
-
@Override
public String toString() {
return "FlinkCommandArgs{" +
- "deployMode=" + deployMode +
- ", masterType=" + masterType +
- ", configFile='" + configFile + '\'' +
- ", variables=" + variables +
- ", jobName='" + jobName + '\'' +
- ", originalParameters=" + originalParameters +
- '}';
+ "deployMode=" + deployMode +
+ ", masterType=" + masterType +
+ ", configFile='" + configFile + '\'' +
+ ", variables=" + variables +
+ ", jobName='" + jobName + '\'' +
+ ", originalParameters=" + originalParameters +
+ '}';
}
public static class FlinkMasterTargetConverter implements IStringConverter<MasterType> {
@@ -102,8 +90,8 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
return masterType;
} else {
throw new IllegalArgumentException("SeaTunnel job on flink engine submitted target only " +
- "support these options: [local, remote, yarn-session, yarn-per-job, kubernetes-session, " +
- "yarn-application, kubernetes-application]");
+ "support these options: [local, remote, yarn-session, yarn-per-job, kubernetes-session, " +
+ "yarn-application, kubernetes-application]");
}
}
}
@@ -123,7 +111,7 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
return deployMode;
} else {
throw new IllegalArgumentException("SeaTunnel job on flink engine deploy mode only " +
- "support these options: [run, run-application]");
+ "support these options: [run, run-application]");
}
}
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
index cd85358ee..f43781838 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
@@ -26,10 +26,14 @@ import org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import java.util.ArrayList;
import java.util.List;
+@EqualsAndHashCode(callSuper = true)
+@Data
public class SparkCommandArgs extends AbstractCommandArgs {
@Parameter(names = {"-e", "--deploy-mode"},
@@ -39,7 +43,7 @@ public class SparkCommandArgs extends AbstractCommandArgs {
@Parameter(names = {"-m", "--master"},
description = "Spark master, support [spark://host:port, mesos://host:port, yarn, " +
- "k8s://https://host:port, local], default local[*]")
+ "k8s://https://host:port, local], default local[*]")
private String master = "local[*]";
@Override
@@ -52,22 +56,6 @@ public class SparkCommandArgs extends AbstractCommandArgs {
}
}
- public String getMaster() {
- return master;
- }
-
- public DeployMode getDeployMode() {
- return deployMode;
- }
-
- public void setDeployMode(DeployMode deployMode) {
- this.deployMode = deployMode;
- }
-
- public void setMaster(String master) {
- this.master = master;
- }
-
public static class SparkDeployModeConverter implements IStringConverter<DeployMode> {
private static final List<DeployMode> DEPLOY_MODE_TYPE_LIST = new ArrayList<>();
@@ -83,7 +71,7 @@ public class SparkCommandArgs extends AbstractCommandArgs {
return deployMode;
} else {
throw new IllegalArgumentException("SeaTunnel job on spark engine deploy mode only " +
- "support these options: [cluster, client]");
+ "support these options: [cluster, client]");
}
}
}
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 82046db0e..b5ef604b6 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -27,10 +27,14 @@ import org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelConfValidate
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import java.util.ArrayList;
import java.util.List;
+@EqualsAndHashCode(callSuper = true)
+@Data
public class ClientCommandArgs extends AbstractCommandArgs {
@Parameter(names = {"-m", "--master"},
description = "SeaTunnel job submit master, support [local, cluster]",
@@ -65,6 +69,10 @@ public class ClientCommandArgs extends AbstractCommandArgs {
description = "list job status")
private boolean listJob = false;
+ @Parameter(names = {"-cj", "--close-job"},
+ description = "Close client the task will also be closed")
+ private boolean closeJob = true;
+
@Override
public Command<?> buildCommand() {
Common.setDeployMode(getDeployMode());
@@ -75,62 +83,6 @@ public class ClientCommandArgs extends AbstractCommandArgs {
}
}
- public MasterType getMasterType() {
- return masterType;
- }
-
- public void setMasterType(MasterType masterType) {
- this.masterType = masterType;
- }
-
- public String getClusterName() {
- return clusterName;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
- public String getJobId() {
- return jobId;
- }
-
- public void setJobId(String jobId) {
- this.jobId = jobId;
- }
-
- public String getCancelJobId() {
- return cancelJobId;
- }
-
- public void setCancelJobId(String cancelJobId) {
- this.cancelJobId = cancelJobId;
- }
-
- public String getMetricsJobId() {
- return metricsJobId;
- }
-
- public String getRestoreJobId(){
- return restoreJobId;
- }
-
- public String getSavePointJobId(){
- return savePointJobId;
- }
-
- public void setMetricsJobId(String metricsJobId) {
- this.metricsJobId = metricsJobId;
- }
-
- public boolean isListJob() {
- return listJob;
- }
-
- public void setListJob(boolean listJob) {
- this.listJob = listJob;
- }
-
public DeployMode getDeployMode() {
return DeployMode.CLIENT;
}
@@ -150,7 +102,7 @@ public class ClientCommandArgs extends AbstractCommandArgs {
return masterType;
} else {
throw new IllegalArgumentException("SeaTunnel job on st-engine submitted target only " +
- "support these options: [local, cluster]");
+ "support these options: [local, cluster]");
}
}
}
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
index 2a1a92d2f..d91e18690 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
@@ -22,7 +22,11 @@ import org.apache.seatunnel.core.starter.command.CommandArgs;
import org.apache.seatunnel.core.starter.seatunnel.command.ServerExecuteCommand;
import com.beust.jcommander.Parameter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+@EqualsAndHashCode(callSuper = true)
+@Data
public class ServerCommandArgs extends CommandArgs {
@Parameter(names = {"-cn", "--cluster"},
description = "The name of cluster")
@@ -32,12 +36,4 @@ public class ServerCommandArgs extends CommandArgs {
public Command<?> buildCommand() {
return new ServerExecuteCommand(this);
}
-
- public String getClusterName() {
- return clusterName;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
}
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index ede92ef3c..e8ad21092 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
import com.hazelcast.client.config.ClientConfig;
@@ -57,6 +58,8 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
private final ClientCommandArgs clientCommandArgs;
+ private JobStatus jobStatus;
+
public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) {
this.clientCommandArgs = clientCommandArgs;
}
@@ -92,10 +95,9 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
} else if (null != clientCommandArgs.getMetricsJobId()) {
String jobMetrics = engineClient.getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId()));
System.out.println(jobMetrics);
- } else if (null != clientCommandArgs.getSavePointJobId()){
+ } else if (null != clientCommandArgs.getSavePointJobId()) {
engineClient.savePointJob(Long.parseLong(clientCommandArgs.getSavePointJobId()));
- }
- else {
+ } else {
Path configFile = FileUtils.getConfigPath(clientCommandArgs);
checkConfigExist(configFile);
JobConfig jobConfig = new JobConfig();
@@ -112,14 +114,24 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
startTime = LocalDateTime.now();
// create job proxy
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ // register cancelJob hook
+ if (clientCommandArgs.isCloseJob()) {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ shutdownHook(clientJobProxy);
+ } catch (Exception e) {
+ log.error("Cancel job failed.", e);
+ }
+ }));
+ }
// get job id
long jobId = clientJobProxy.getJobId();
JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId);
executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(jobMetricsRunner, 0,
- seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS);
+ seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS);
// wait for job complete
- clientJobProxy.waitForJobComplete();
+ jobStatus = clientJobProxy.waitForJobComplete();
// get job end time
endTime = LocalDateTime.now();
// get job statistic information when job finished
@@ -140,24 +152,24 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
if (jobMetricsSummary != null) {
// print job statistics information when job finished
log.info(StringFormatUtils.formatTable(
- "Job Statistic Information",
- "Start Time",
- DateTimeUtils.toString(startTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+ "Job Statistic Information",
+ "Start Time",
+ DateTimeUtils.toString(startTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
- "End Time",
- DateTimeUtils.toString(endTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+ "End Time",
+ DateTimeUtils.toString(endTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
- "Total Time(s)",
- Duration.between(startTime, endTime).getSeconds(),
+ "Total Time(s)",
+ Duration.between(startTime, endTime).getSeconds(),
- "Total Read Count",
- jobMetricsSummary.getSourceReadCount(),
+ "Total Read Count",
+ jobMetricsSummary.getSourceReadCount(),
- "Total Write Count",
- jobMetricsSummary.getSinkWriteCount(),
+ "Total Write Count",
+ jobMetricsSummary.getSinkWriteCount(),
- "Total Failed Count",
- jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount()));
+ "Total Failed Count",
+ jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount()));
}
}
}
@@ -176,4 +188,11 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
return namePrefix + "-" + random.nextInt(1000000);
}
+ private void shutdownHook(ClientJobProxy clientJobProxy) {
+ if (jobStatus == null || !jobStatus.isEndState()) {
+ log.warn("Task will be closed due to client shutdown.");
+ clientJobProxy.cancelJob();
+ }
+ }
+
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index c3be76ae7..8c1845549 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -82,9 +82,7 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
}
public void shutdown() {
- if (hazelcastClient != null) {
- hazelcastClient.shutdown();
- }
+ hazelcastClient.shutdown();
}
/**