You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/07 09:25:10 UTC

[incubator-inlong] branch master updated: [INLONG-3538][Manager] Adjust the mode of getting sort URL (#3554)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new afa48f229 [INLONG-3538][Manager] Adjust the mode of getting sort URL (#3554)
afa48f229 is described below

commit afa48f2295814ea3bfb20eac3295bcb837422a35
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Thu Apr 7 17:25:03 2022 +0800

    [INLONG-3538][Manager] Adjust the mode of getting sort URL (#3554)
---
 .../inlong/manager/plugin/flink/FlinkService.java  | 38 ++++++++++++++++++++--
 .../plugin/flink/IntergrationTaskRunner.java       |  8 ++++-
 .../inlong/manager/plugin/flink/dto/FlinkInfo.java |  2 ++
 .../plugin/listener/DeleteSortListener.java        | 17 +++++-----
 .../plugin/listener/RestartSortListener.java       | 11 +++++--
 .../plugin/listener/StartupSortListener.java       |  5 ++-
 .../plugin/listener/SuspendSortListener.java       | 19 ++++++-----
 .../inlong/manager/plugin/util/FlinkUtils.java     | 34 +++++++------------
 .../plugin/listener/DeleteSortListenerTest.java    |  2 +-
 .../plugin/listener/RestartSortListenerTest.java   |  4 +--
 .../plugin/listener/SuspendSortListenerTest.java   |  2 +-
 11 files changed, 92 insertions(+), 50 deletions(-)

diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index e1d3f54c4..23189e977 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -49,12 +49,19 @@ import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 public class FlinkService {
+
+    private static final Pattern numberPattern = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+)\\:(\\d+)");
+
     private final FlinkConfig flinkConfig;
     private final Integer port;
     private final Integer jobManagerPort;
@@ -63,16 +70,41 @@ public class FlinkService {
     private final Integer parallelism;
     private final String savepointDirectory;
 
-    public FlinkService() throws IOException {
+    public FlinkService(String endpoint) throws IOException {
         FlinkConfiguration flinkConfiguration = new FlinkConfiguration();
         flinkConfig = flinkConfiguration.getFlinkConfig();
-        address = flinkConfig.getAddress();
-        port = flinkConfig.getPort();
         jobManagerPort = flinkConfig.getJobManagerPort();
         parallelism = flinkConfig.getParallelism();
         savepointDirectory = flinkConfig.getSavepointDirectory();
+        if (StringUtils.isEmpty(endpoint)) {
+            address = flinkConfig.getAddress();
+            port = flinkConfig.getPort();
+        } else {
+            address = translateFromEndpont(endpoint).get("address");
+            port = Integer.valueOf(translateFromEndpont(endpoint).get("port"));
+        }
         urlHead = Constants.HTTP_URL +  address + Constants.SEPARATOR + port;
+    }
 
+    /**
+     * translate  Endpont to address & port
+     *
+     * @param endpoint
+     * @return
+     */
+    private Map<String, String> translateFromEndpont(String endpoint) {
+        Map<String, String> map = new HashMap<>(2);
+        try {
+            Matcher matcher = numberPattern.matcher(endpoint);
+            while (matcher.find()) {
+                map.put("address", matcher.group(1));
+                map.put("port", matcher.group(2));
+                return map;
+            }
+        } catch (Exception e) {
+            log.error("fetch addres:port fail", e.getMessage());
+        }
+        return map;
     }
 
     /**
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java
index 68bf92e93..3516304db 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java
@@ -170,7 +170,13 @@ public class IntergrationTaskRunner implements Runnable {
                         String status = jobStatus.get("id").asText();
                         String savepointPath = operation.get("location").asText();
                         flinkInfo.setSavepointPath(savepointPath);
-                        log.info("the jobId :{} status: {} ", flinkInfo.getJobId(),status);
+                        log.info("the jobId {} status: {} ", flinkInfo.getJobId(),status);
+                    }
+                    JobStatus jobStatus = flinkService.getJobStatus(flinkInfo.getJobId());
+                    if (jobStatus.isTerminalState()) {
+                        log.info("stop job {}, status: {}, success in backend", flinkInfo.getJobId(), jobStatus);
+                    } else {
+                        log.info("stop job {}, status: {}, fail in backend", flinkInfo.getJobId(), jobStatus);
                     }
                 } catch (Exception e) {
                     String msg = String.format("stop job %s failed in backend exception[%s]", flinkInfo.getJobId(),
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index ce904cb0f..c3e767512 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -25,6 +25,8 @@ import java.util.List;
 @Data
 public class FlinkInfo {
 
+    private String endpoint;
+
     private String jobName;
 
     private List<InlongStreamInfo> inlongStreamInfoList;
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 4ffd3feb3..3623f7fe9 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -68,17 +69,17 @@ public class DeleteSortListener implements SortOperateListener {
         Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
                 new TypeReference<Map<String, String>>(){});
         kvConf.putAll(result);
-        if (StringUtils.isEmpty(kvConf.get(InlongGroupSettings.SORT_JOB_ID))) {
-            String message = String.format("inlongGroupId:%s not add deleteProcess listener,SORT_JOB_ID is empty",
-                    inlongGroupId);
-            log.warn(message);
-            return ListenerResult.fail(message);
-        }
+
         FlinkInfo flinkInfo = new FlinkInfo();
 
-        flinkInfo.setJobId(kvConf.get(InlongGroupSettings.SORT_JOB_ID));
+        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        Preconditions.checkNotEmpty(jobId, "sort jobId is empty");
+        flinkInfo.setJobId(jobId);
+
+        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        flinkInfo.setEndpoint(sortUrl);
 
-        FlinkService flinkService = new FlinkService();
+        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
 
         try {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 2eab5739d..00e8f6431 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.plugin.flink.Constants;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
@@ -99,10 +100,16 @@ public class RestartSortListener implements SortOperateListener {
         }
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
         FlinkInfo flinkInfo = new FlinkInfo();
-        flinkInfo.setJobId(kvConf.get(InlongGroupSettings.SORT_JOB_ID));
         flinkInfo.setJobName(jobName);
 
-        FlinkService flinkService = new FlinkService();
+        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        Preconditions.checkNotEmpty(jobId, "sort jobId is empty");
+        flinkInfo.setJobId(jobId);
+
+        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        flinkInfo.setEndpoint(sortUrl);
+
+        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
         managerFlinkTask.genPath(flinkInfo,dataFlow.toString());
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index cd4bb89ee..abca65e64 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -92,12 +92,15 @@ public class StartupSortListener implements SortOperateListener {
         FlinkInfo flinkInfo = new FlinkInfo();
         parseDataflow(dataFlow, flinkInfo);
 
+        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        flinkInfo.setEndpoint(sortUrl);
+
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
         flinkInfo.setJobName(jobName);
 
         flinkInfo.setInlongStreamInfoList(groupResourceProcessForm.getInlongStreamInfoList());
 
-        FlinkService flinkService = new FlinkService();
+        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
         managerFlinkTask.genPath(flinkInfo, dataFlow.toString());
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 498b6be9d..9c4675549 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -66,15 +67,17 @@ public class SuspendSortListener implements SortOperateListener {
         Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
                 new TypeReference<Map<String, String>>(){});
         kvConf.putAll(result);
-        if (StringUtils.isEmpty(kvConf.get(InlongGroupSettings.SORT_JOB_ID))) {
-            String message = String.format("inlongGroupId:%s not add suspendProcess listener,SORT_JOB_ID is empty",
-                    inlongGroupId);
-            log.warn(message);
-            return ListenerResult.fail(message);
-        }
+
         FlinkInfo flinkInfo = new FlinkInfo();
-        flinkInfo.setJobId(kvConf.get(InlongGroupSettings.SORT_JOB_ID));
-        FlinkService flinkService = new FlinkService();
+
+        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        Preconditions.checkNotEmpty(jobId, "sort jobId is empty");
+        flinkInfo.setJobId(jobId);
+
+        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        flinkInfo.setEndpoint(sortUrl);
+
+        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
 
         try {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 098f3c037..7b0384e44 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.plugin.util;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
-import org.apache.inlong.manager.plugin.flink.dto.LoginConf;
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -37,7 +37,6 @@ import java.util.regex.Pattern;
 @Slf4j
 public class FlinkUtils {
     public static final String BASE_DIRECTORY = "config";
-    private static final Pattern numberPattern = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+)\\:(\\d+)");
 
     /**
      */
@@ -106,6 +105,16 @@ public class FlinkUtils {
         return null;
     }
 
+    /**
+     * get value
+     * @param key
+     * @param defaultValue
+     * @return
+     */
+    public static String getValue(String key, String defaultValue) {
+        return StringUtils.isNotEmpty(key) ? key : defaultValue;
+    }
+
     /**
      * @param name
      * @return
@@ -139,27 +148,6 @@ public class FlinkUtils {
         return true;
     }
 
-    /**
-     * sort_url to address&port
-     * @param endpoint
-     * @return
-     */
-    @Deprecated
-    public static LoginConf translateFromEndpont(String endpoint) {
-        LoginConf loginConf = new LoginConf();
-        try {
-            Matcher matcher = numberPattern.matcher(endpoint);
-            while (matcher.find()) {
-                loginConf.setRestAddress(matcher.group(1));
-                loginConf.setRestPort(Integer.valueOf(matcher.group(2)));
-                return loginConf;
-            }
-        } catch (Exception e) {
-            log.error("fetch addres:port fail", e.getMessage());
-        }
-        return loginConf;
-    }
-
     /**
      * delete configuration file
      * @param name
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
index e7133e2ca..94008dcba 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
@@ -58,7 +58,7 @@ public class DeleteSortListenerTest {
 
         InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
         inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
-        inlongGroupExtInfo5.setKeyValue("209f30999d381d669378b476a4db59e9");
+        inlongGroupExtInfo5.setKeyValue("d7e613fb18876f173ec5ba17465fae64");
         inlongGroupExtInfos.add(inlongGroupExtInfo5);
 
         InlongGroupExtInfo inlongGroupExtInfo6 = new InlongGroupExtInfo();
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
index 749970d7e..6ad03e1ec 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
@@ -43,7 +43,7 @@ public class RestartSortListenerTest {
 
         InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
         inlongGroupExtInfo1.setKeyName(InlongGroupSettings.SORT_URL);
-        inlongGroupExtInfo1.setKeyValue("127.0.0.1:8085");
+        inlongGroupExtInfo1.setKeyValue("127.0.0.1:8081");
         List<InlongGroupExtInfo> inlongGroupExtInfoList = new ArrayList<>();
         inlongGroupExtInfoList.add(inlongGroupExtInfo1);
 
@@ -57,7 +57,7 @@ public class RestartSortListenerTest {
 
         InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
         inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
-        inlongGroupExtInfo5.setKeyValue("52c8977bd28365cdd23ad4ea4cd767a4");
+        inlongGroupExtInfo5.setKeyValue("efdc85a977e72e0d9a99170d78f03ddb");
         inlongGroupExtInfoList.add(inlongGroupExtInfo5);
 
         InlongGroupExtInfo inlongGroupExtInfo6 = new InlongGroupExtInfo();
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
index bbbfd10ea..32f7f88ef 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
@@ -57,7 +57,7 @@ public class SuspendSortListenerTest {
 
         InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
         inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
-        inlongGroupExtInfo5.setKeyValue("ea405ab424cfc35ae9be93df8ea87917");
+        inlongGroupExtInfo5.setKeyValue("118a77173079c7264f08f178da4473fa");
         inlongGroupExtInfos.add(inlongGroupExtInfo5);
 
         inlongGroupInfo.setExtList(inlongGroupExtInfos);