You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by li...@apache.org on 2024/04/09 08:59:34 UTC
(seatunnel) branch dev updated: [Improve][Zeta]Optimize the logic of RestHttpGetCommandProcessor#getSeaTunnelServer() (#6666)
This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 66d8502da5 [Improve][Zeta]Optimize the logic of RestHttpGetCommandProcessor#getSeaTunnelServer() (#6666)
66d8502da5 is described below
commit 66d8502da59f47d4eeda1f16421b72601f9278ca
Author: xiaochen <59...@qq.com>
AuthorDate: Tue Apr 9 16:59:29 2024 +0800
[Improve][Zeta]Optimize the logic of RestHttpGetCommandProcessor#getSeaTunnelServer() (#6666)
* remove plugin lifecycle
* remove transform fallback
* code style
* &&
* optimize code
* remove UnsupportedOperationException
---------
Co-authored-by: ClownXC <ch...@163.com>
---
.../seatunnel/engine/server/SeaTunnelServer.java | 28 ++++---
.../engine/server/master/JobHistoryService.java | 2 +-
.../server/rest/RestHttpGetCommandProcessor.java | 88 +++++++---------------
3 files changed, 45 insertions(+), 73 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index e9dcdca779..7270f01e03 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -190,7 +190,7 @@ public class SeaTunnelServer
.getProperty(INVOCATION_MAX_RETRY_COUNT.getName());
int maxRetry =
hazelcastInvocationMaxRetry == null
- ? 250 * 2
+ ? Integer.parseInt(INVOCATION_MAX_RETRY_COUNT.getDefaultValue()) * 2
: Integer.parseInt(hazelcastInvocationMaxRetry) * 2;
String hazelcastRetryPause =
@@ -199,12 +199,14 @@ public class SeaTunnelServer
.getProperty(INVOCATION_RETRY_PAUSE.getName());
int retryPause =
- hazelcastRetryPause == null ? 500 : Integer.parseInt(hazelcastRetryPause);
+ hazelcastRetryPause == null
+ ? Integer.parseInt(INVOCATION_RETRY_PAUSE.getDefaultValue())
+ : Integer.parseInt(hazelcastRetryPause);
- while (isMasterNode()
- && !coordinatorService.isCoordinatorActive()
+ while (isRunning
&& retryCount < maxRetry
- && isRunning) {
+ && !coordinatorService.isCoordinatorActive()
+ && isMasterNode()) {
try {
LOGGER.warning(
"This is master node, waiting the coordinator service init finished");
@@ -254,13 +256,15 @@ public class SeaTunnelServer
public boolean isMasterNode() {
// must retry until the cluster have master node
try {
- return RetryUtils.retryWithException(
- () -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()),
- new RetryUtils.RetryMaterial(
- Constant.OPERATION_RETRY_TIME,
- true,
- exception -> exception instanceof NullPointerException && isRunning,
- Constant.OPERATION_RETRY_SLEEP));
+ return Boolean.TRUE.equals(
+ RetryUtils.retryWithException(
+ () -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()),
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception ->
+ isRunning && exception instanceof NullPointerException,
+ Constant.OPERATION_RETRY_SLEEP)));
} catch (InterruptedException e) {
LOGGER.info("master node check interrupted");
return false;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 83b5ab29e7..f3905a9e92 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -248,7 +248,7 @@ public class JobHistoryService {
private Long jobId;
private String jobName;
private JobStatus jobStatus;
- private long submitTime;
+ private Long submitTime;
private Long finishTime;
private Map<PipelineLocation, PipelineStateData> pipelineStateMapperMap;
private String errorMessage;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 79f29575a1..25cd51474f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
@@ -44,7 +45,6 @@ import com.hazelcast.cluster.Member;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpGetCommand;
-import com.hazelcast.internal.json.Json;
import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.json.JsonValue;
@@ -54,10 +54,8 @@ import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
-import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -210,7 +208,7 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
.getNodeEngine()
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO);
-
+ SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
JsonArray jobs =
finishedJob.values().stream()
.filter(
@@ -226,7 +224,6 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
.map(
jobState -> {
Long jobId = jobState.getJobId();
- SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
String jobMetrics;
if (seaTunnelServer == null) {
jobMetrics =
@@ -243,15 +240,8 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
.getJobMetrics(jobId)
.toJsonString();
}
-
- JobDAGInfo jobDAGInfo = finishedJobDAGInfo.get(jobId);
-
- return convertToJson(
- jobState,
- jobMetrics,
- Json.parse(JsonUtils.toJsonString(jobDAGInfo))
- .asObject(),
- jobId);
+ return getJobInfoJson(
+ jobState, jobMetrics, finishedJobDAGInfo.get(jobId));
})
.collect(JsonArray::new, JsonArray::add, JsonArray::add);
@@ -301,7 +291,10 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
.get(Long.valueOf(jobId));
this.prepareResponse(
command,
- convertToJson(finishedJobState, finishedJobMetrics, finishedJobDAGInfo));
+ getJobInfoJson(
+ finishedJobState,
+ finishedJobMetrics.toJsonString(),
+ finishedJobDAGInfo));
} else {
this.prepareResponse(command, new JsonObject().add(RestConstant.JOB_ID, jobId));
}
@@ -353,7 +346,7 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
this.textCommandService.getNode().getNodeExtension().createExtensionServices();
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer) extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
- if (!seaTunnelServer.isMasterNode() && shouldBeMaster) {
+ if (shouldBeMaster && !seaTunnelServer.isMasterNode()) {
return null;
}
return seaTunnelServer;
@@ -374,7 +367,11 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
.getSerializationService()
.toObject(jobInfo.getJobImmutableInformation()));
- ClassLoaderService classLoaderService = getSeaTunnelServer(false).getClassLoaderService();
+ SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
+ ClassLoaderService classLoaderService =
+ seaTunnelServer == null
+ ? getSeaTunnelServer(false).getClassLoaderService()
+ : seaTunnelServer.getClassLoaderService();
ClassLoader classLoader =
classLoaderService.getClassLoader(
jobId, jobImmutableInformation.getPluginJarsUrls());
@@ -385,7 +382,6 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
jobImmutableInformation.getLogicalDag());
classLoaderService.releaseClassLoader(jobId, jobImmutableInformation.getPluginJarsUrls());
- SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
String jobMetrics;
JobStatus jobStatus;
if (seaTunnelServer == null) {
@@ -416,8 +412,9 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
.add(
RestConstant.CREATE_TIME,
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- .format(new Date(jobImmutableInformation.getCreateTime())))
+ DateTimeUtils.toString(
+ jobImmutableInformation.getCreateTime(),
+ DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
.add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
.add(
RestConstant.PLUGIN_JARS_URLS,
@@ -439,53 +436,24 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
return jobInfoJson;
}
- private JsonObject convertToJson(
- JobState finishedJobState,
- JobMetrics finishedJobMetrics,
- JobDAGInfo finishedJobDAGInfo) {
- JsonObject jobInfoJson = new JsonObject();
- jobInfoJson
- .add(RestConstant.JOB_ID, String.valueOf(finishedJobState.getJobId()))
- .add(RestConstant.JOB_NAME, finishedJobState.getJobName())
- .add(RestConstant.JOB_STATUS, finishedJobState.getJobStatus().toString())
- .add(RestConstant.ERROR_MSG, finishedJobState.getErrorMessage())
- .add(
- RestConstant.CREATE_TIME,
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- .format(new Date(finishedJobState.getSubmitTime())))
- .add(
- RestConstant.FINISH_TIME,
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- .format(new Date(finishedJobState.getFinishTime())))
- .add(
- RestConstant.JOB_DAG,
- Json.parse(JsonUtils.toJsonString(finishedJobDAGInfo)).asObject())
- .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
- .add(
- RestConstant.METRICS,
- JsonUtil.toJsonObject(getJobMetrics(finishedJobMetrics.toJsonString())));
- return jobInfoJson;
- }
-
- private JsonObject convertToJson(
- JobState jobState, String jobMetrics, JsonObject jobDAGInfo, long jobId) {
- JsonObject jobInfoJson = new JsonObject();
- jobInfoJson
- .add(RestConstant.JOB_ID, String.valueOf(jobId))
+ private JsonObject getJobInfoJson(JobState jobState, String jobMetrics, JobDAGInfo jobDAGInfo) {
+ return new JsonObject()
+ .add(RestConstant.JOB_ID, String.valueOf(jobState.getJobId()))
.add(RestConstant.JOB_NAME, jobState.getJobName())
.add(RestConstant.JOB_STATUS, jobState.getJobStatus().toString())
.add(RestConstant.ERROR_MSG, jobState.getErrorMessage())
.add(
RestConstant.CREATE_TIME,
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- .format(new Date(jobState.getSubmitTime())))
+ DateTimeUtils.toString(
+ jobState.getSubmitTime(),
+ DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
.add(
RestConstant.FINISH_TIME,
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- .format(new Date(jobState.getFinishTime())))
- .add(RestConstant.JOB_DAG, jobDAGInfo)
+ DateTimeUtils.toString(
+ jobState.getFinishTime(),
+ DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
+ .add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo))
+ .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
.add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
-
- return jobInfoJson;
}
}