You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by li...@apache.org on 2024/04/09 08:59:34 UTC

(seatunnel) branch dev updated: [Improve][Zeta]Optimize the logic of RestHttpGetCommandProcessor#getSeaTunnelServer() (#6666)

This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 66d8502da5 [Improve][Zeta]Optimize the logic of RestHttpGetCommandProcessor#getSeaTunnelServer()  (#6666)
66d8502da5 is described below

commit 66d8502da59f47d4eeda1f16421b72601f9278ca
Author: xiaochen <59...@qq.com>
AuthorDate: Tue Apr 9 16:59:29 2024 +0800

    [Improve][Zeta]Optimize the logic of RestHttpGetCommandProcessor#getSeaTunnelServer()  (#6666)
    
    * remove plugin lifecycle
    
    * remove transform fallback
    
    * code style
    
    * &&
    
    * optimize code
    
    * remove UnsupportedOperationException
    
    ---------
    
    Co-authored-by: ClownXC <ch...@163.com>
---
 .../seatunnel/engine/server/SeaTunnelServer.java   | 28 ++++---
 .../engine/server/master/JobHistoryService.java    |  2 +-
 .../server/rest/RestHttpGetCommandProcessor.java   | 88 +++++++---------------
 3 files changed, 45 insertions(+), 73 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index e9dcdca779..7270f01e03 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -190,7 +190,7 @@ public class SeaTunnelServer
                             .getProperty(INVOCATION_MAX_RETRY_COUNT.getName());
             int maxRetry =
                     hazelcastInvocationMaxRetry == null
-                            ? 250 * 2
+                            ? Integer.parseInt(INVOCATION_MAX_RETRY_COUNT.getDefaultValue()) * 2
                             : Integer.parseInt(hazelcastInvocationMaxRetry) * 2;
 
             String hazelcastRetryPause =
@@ -199,12 +199,14 @@ public class SeaTunnelServer
                             .getProperty(INVOCATION_RETRY_PAUSE.getName());
 
             int retryPause =
-                    hazelcastRetryPause == null ? 500 : Integer.parseInt(hazelcastRetryPause);
+                    hazelcastRetryPause == null
+                            ? Integer.parseInt(INVOCATION_RETRY_PAUSE.getDefaultValue())
+                            : Integer.parseInt(hazelcastRetryPause);
 
-            while (isMasterNode()
-                    && !coordinatorService.isCoordinatorActive()
+            while (isRunning
                     && retryCount < maxRetry
-                    && isRunning) {
+                    && !coordinatorService.isCoordinatorActive()
+                    && isMasterNode()) {
                 try {
                     LOGGER.warning(
                             "This is master node, waiting the coordinator service init finished");
@@ -254,13 +256,15 @@ public class SeaTunnelServer
     public boolean isMasterNode() {
         // must retry until the cluster have master node
         try {
-            return RetryUtils.retryWithException(
-                    () -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()),
-                    new RetryUtils.RetryMaterial(
-                            Constant.OPERATION_RETRY_TIME,
-                            true,
-                            exception -> exception instanceof NullPointerException && isRunning,
-                            Constant.OPERATION_RETRY_SLEEP));
+            return Boolean.TRUE.equals(
+                    RetryUtils.retryWithException(
+                            () -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()),
+                            new RetryUtils.RetryMaterial(
+                                    Constant.OPERATION_RETRY_TIME,
+                                    true,
+                                    exception ->
+                                            isRunning && exception instanceof NullPointerException,
+                                    Constant.OPERATION_RETRY_SLEEP)));
         } catch (InterruptedException e) {
             LOGGER.info("master node check interrupted");
             return false;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 83b5ab29e7..f3905a9e92 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -248,7 +248,7 @@ public class JobHistoryService {
         private Long jobId;
         private String jobName;
         private JobStatus jobStatus;
-        private long submitTime;
+        private Long submitTime;
         private Long finishTime;
         private Map<PipelineLocation, PipelineStateData> pipelineStateMapperMap;
         private String errorMessage;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 79f29575a1..25cd51474f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
@@ -44,7 +45,6 @@ import com.hazelcast.cluster.Member;
 import com.hazelcast.internal.ascii.TextCommandService;
 import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
 import com.hazelcast.internal.ascii.rest.HttpGetCommand;
-import com.hazelcast.internal.json.Json;
 import com.hazelcast.internal.json.JsonArray;
 import com.hazelcast.internal.json.JsonObject;
 import com.hazelcast.internal.json.JsonValue;
@@ -54,10 +54,8 @@ import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 
-import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -210,7 +208,7 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
                         .getNodeEngine()
                         .getHazelcastInstance()
                         .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO);
-
+        SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
         JsonArray jobs =
                 finishedJob.values().stream()
                         .filter(
@@ -226,7 +224,6 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
                         .map(
                                 jobState -> {
                                     Long jobId = jobState.getJobId();
-                                    SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
                                     String jobMetrics;
                                     if (seaTunnelServer == null) {
                                         jobMetrics =
@@ -243,15 +240,8 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
                                                         .getJobMetrics(jobId)
                                                         .toJsonString();
                                     }
-
-                                    JobDAGInfo jobDAGInfo = finishedJobDAGInfo.get(jobId);
-
-                                    return convertToJson(
-                                            jobState,
-                                            jobMetrics,
-                                            Json.parse(JsonUtils.toJsonString(jobDAGInfo))
-                                                    .asObject(),
-                                            jobId);
+                                    return getJobInfoJson(
+                                            jobState, jobMetrics, finishedJobDAGInfo.get(jobId));
                                 })
                         .collect(JsonArray::new, JsonArray::add, JsonArray::add);
 
@@ -301,7 +291,10 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
                                     .get(Long.valueOf(jobId));
             this.prepareResponse(
                     command,
-                    convertToJson(finishedJobState, finishedJobMetrics, finishedJobDAGInfo));
+                    getJobInfoJson(
+                            finishedJobState,
+                            finishedJobMetrics.toJsonString(),
+                            finishedJobDAGInfo));
         } else {
             this.prepareResponse(command, new JsonObject().add(RestConstant.JOB_ID, jobId));
         }
@@ -353,7 +346,7 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
                 this.textCommandService.getNode().getNodeExtension().createExtensionServices();
         SeaTunnelServer seaTunnelServer =
                 (SeaTunnelServer) extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
-        if (!seaTunnelServer.isMasterNode() && shouldBeMaster) {
+        if (shouldBeMaster && !seaTunnelServer.isMasterNode()) {
             return null;
         }
         return seaTunnelServer;
@@ -374,7 +367,11 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
                                         .getSerializationService()
                                         .toObject(jobInfo.getJobImmutableInformation()));
 
-        ClassLoaderService classLoaderService = getSeaTunnelServer(false).getClassLoaderService();
+        SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
+        ClassLoaderService classLoaderService =
+                seaTunnelServer == null
+                        ? getSeaTunnelServer(false).getClassLoaderService()
+                        : seaTunnelServer.getClassLoaderService();
         ClassLoader classLoader =
                 classLoaderService.getClassLoader(
                         jobId, jobImmutableInformation.getPluginJarsUrls());
@@ -385,7 +382,6 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
                         jobImmutableInformation.getLogicalDag());
         classLoaderService.releaseClassLoader(jobId, jobImmutableInformation.getPluginJarsUrls());
 
-        SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
         String jobMetrics;
         JobStatus jobStatus;
         if (seaTunnelServer == null) {
@@ -416,8 +412,9 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
                         JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
                 .add(
                         RestConstant.CREATE_TIME,
-                        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
-                                .format(new Date(jobImmutableInformation.getCreateTime())))
+                        DateTimeUtils.toString(
+                                jobImmutableInformation.getCreateTime(),
+                                DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
                 .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
                 .add(
                         RestConstant.PLUGIN_JARS_URLS,
@@ -439,53 +436,24 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
         return jobInfoJson;
     }
 
-    private JsonObject convertToJson(
-            JobState finishedJobState,
-            JobMetrics finishedJobMetrics,
-            JobDAGInfo finishedJobDAGInfo) {
-        JsonObject jobInfoJson = new JsonObject();
-        jobInfoJson
-                .add(RestConstant.JOB_ID, String.valueOf(finishedJobState.getJobId()))
-                .add(RestConstant.JOB_NAME, finishedJobState.getJobName())
-                .add(RestConstant.JOB_STATUS, finishedJobState.getJobStatus().toString())
-                .add(RestConstant.ERROR_MSG, finishedJobState.getErrorMessage())
-                .add(
-                        RestConstant.CREATE_TIME,
-                        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
-                                .format(new Date(finishedJobState.getSubmitTime())))
-                .add(
-                        RestConstant.FINISH_TIME,
-                        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
-                                .format(new Date(finishedJobState.getFinishTime())))
-                .add(
-                        RestConstant.JOB_DAG,
-                        Json.parse(JsonUtils.toJsonString(finishedJobDAGInfo)).asObject())
-                .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
-                .add(
-                        RestConstant.METRICS,
-                        JsonUtil.toJsonObject(getJobMetrics(finishedJobMetrics.toJsonString())));
-        return jobInfoJson;
-    }
-
-    private JsonObject convertToJson(
-            JobState jobState, String jobMetrics, JsonObject jobDAGInfo, long jobId) {
-        JsonObject jobInfoJson = new JsonObject();
-        jobInfoJson
-                .add(RestConstant.JOB_ID, String.valueOf(jobId))
+    private JsonObject getJobInfoJson(JobState jobState, String jobMetrics, JobDAGInfo jobDAGInfo) {
+        return new JsonObject()
+                .add(RestConstant.JOB_ID, String.valueOf(jobState.getJobId()))
                 .add(RestConstant.JOB_NAME, jobState.getJobName())
                 .add(RestConstant.JOB_STATUS, jobState.getJobStatus().toString())
                 .add(RestConstant.ERROR_MSG, jobState.getErrorMessage())
                 .add(
                         RestConstant.CREATE_TIME,
-                        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
-                                .format(new Date(jobState.getSubmitTime())))
+                        DateTimeUtils.toString(
+                                jobState.getSubmitTime(),
+                                DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
                 .add(
                         RestConstant.FINISH_TIME,
-                        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
-                                .format(new Date(jobState.getFinishTime())))
-                .add(RestConstant.JOB_DAG, jobDAGInfo)
+                        DateTimeUtils.toString(
+                                jobState.getFinishTime(),
+                                DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
+                .add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo))
+                .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
                 .add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
-
-        return jobInfoJson;
     }
 }