You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/07 09:25:10 UTC
[incubator-inlong] branch master updated: [INLONG-3538][Manager] Adjust the mode of getting sort URL (#3554)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new afa48f229 [INLONG-3538][Manager] Adjust the mode of getting sort URL (#3554)
afa48f229 is described below
commit afa48f2295814ea3bfb20eac3295bcb837422a35
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Thu Apr 7 17:25:03 2022 +0800
[INLONG-3538][Manager] Adjust the mode of getting sort URL (#3554)
---
.../inlong/manager/plugin/flink/FlinkService.java | 38 ++++++++++++++++++++--
.../plugin/flink/IntergrationTaskRunner.java | 8 ++++-
.../inlong/manager/plugin/flink/dto/FlinkInfo.java | 2 ++
.../plugin/listener/DeleteSortListener.java | 17 +++++-----
.../plugin/listener/RestartSortListener.java | 11 +++++--
.../plugin/listener/StartupSortListener.java | 5 ++-
.../plugin/listener/SuspendSortListener.java | 19 ++++++-----
.../inlong/manager/plugin/util/FlinkUtils.java | 34 +++++++------------
.../plugin/listener/DeleteSortListenerTest.java | 2 +-
.../plugin/listener/RestartSortListenerTest.java | 4 +--
.../plugin/listener/SuspendSortListenerTest.java | 2 +-
11 files changed, 92 insertions(+), 50 deletions(-)
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index e1d3f54c4..23189e977 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -49,12 +49,19 @@ import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
@Slf4j
public class FlinkService {
+
+ private static final Pattern numberPattern = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+)\\:(\\d+)");
+
private final FlinkConfig flinkConfig;
private final Integer port;
private final Integer jobManagerPort;
@@ -63,16 +70,41 @@ public class FlinkService {
private final Integer parallelism;
private final String savepointDirectory;
- public FlinkService() throws IOException {
+ public FlinkService(String endpoint) throws IOException {
FlinkConfiguration flinkConfiguration = new FlinkConfiguration();
flinkConfig = flinkConfiguration.getFlinkConfig();
- address = flinkConfig.getAddress();
- port = flinkConfig.getPort();
jobManagerPort = flinkConfig.getJobManagerPort();
parallelism = flinkConfig.getParallelism();
savepointDirectory = flinkConfig.getSavepointDirectory();
+ if (StringUtils.isEmpty(endpoint)) {
+ address = flinkConfig.getAddress();
+ port = flinkConfig.getPort();
+ } else {
+ address = translateFromEndpont(endpoint).get("address");
+ port = Integer.valueOf(translateFromEndpont(endpoint).get("port"));
+ }
urlHead = Constants.HTTP_URL + address + Constants.SEPARATOR + port;
+ }
+ /**
+ * translate Endpont to address & port
+ *
+ * @param endpoint
+ * @return
+ */
+ private Map<String, String> translateFromEndpont(String endpoint) {
+ Map<String, String> map = new HashMap<>(2);
+ try {
+ Matcher matcher = numberPattern.matcher(endpoint);
+ while (matcher.find()) {
+ map.put("address", matcher.group(1));
+ map.put("port", matcher.group(2));
+ return map;
+ }
+ } catch (Exception e) {
+ log.error("fetch addres:port fail", e.getMessage());
+ }
+ return map;
}
/**
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java
index 68bf92e93..3516304db 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java
@@ -170,7 +170,13 @@ public class IntergrationTaskRunner implements Runnable {
String status = jobStatus.get("id").asText();
String savepointPath = operation.get("location").asText();
flinkInfo.setSavepointPath(savepointPath);
- log.info("the jobId :{} status: {} ", flinkInfo.getJobId(),status);
+ log.info("the jobId {} status: {} ", flinkInfo.getJobId(),status);
+ }
+ JobStatus jobStatus = flinkService.getJobStatus(flinkInfo.getJobId());
+ if (jobStatus.isTerminalState()) {
+ log.info("stop job {}, status: {}, success in backend", flinkInfo.getJobId(), jobStatus);
+ } else {
+ log.info("stop job {}, status: {}, fail in backend", flinkInfo.getJobId(), jobStatus);
}
} catch (Exception e) {
String msg = String.format("stop job %s failed in backend exception[%s]", flinkInfo.getJobId(),
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index ce904cb0f..c3e767512 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -25,6 +25,8 @@ import java.util.List;
@Data
public class FlinkInfo {
+ private String endpoint;
+
private String jobName;
private List<InlongStreamInfo> inlongStreamInfoList;
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 4ffd3feb3..3623f7fe9 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -68,17 +69,17 @@ public class DeleteSortListener implements SortOperateListener {
Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
new TypeReference<Map<String, String>>(){});
kvConf.putAll(result);
- if (StringUtils.isEmpty(kvConf.get(InlongGroupSettings.SORT_JOB_ID))) {
- String message = String.format("inlongGroupId:%s not add deleteProcess listener,SORT_JOB_ID is empty",
- inlongGroupId);
- log.warn(message);
- return ListenerResult.fail(message);
- }
+
FlinkInfo flinkInfo = new FlinkInfo();
- flinkInfo.setJobId(kvConf.get(InlongGroupSettings.SORT_JOB_ID));
+ String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ Preconditions.checkNotEmpty(jobId, "sort jobId is empty");
+ flinkInfo.setJobId(jobId);
+
+ String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ flinkInfo.setEndpoint(sortUrl);
- FlinkService flinkService = new FlinkService();
+ FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
try {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 2eab5739d..00e8f6431 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.plugin.flink.Constants;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
@@ -99,10 +100,16 @@ public class RestartSortListener implements SortOperateListener {
}
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
FlinkInfo flinkInfo = new FlinkInfo();
- flinkInfo.setJobId(kvConf.get(InlongGroupSettings.SORT_JOB_ID));
flinkInfo.setJobName(jobName);
- FlinkService flinkService = new FlinkService();
+ String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ Preconditions.checkNotEmpty(jobId, "sort jobId is empty");
+ flinkInfo.setJobId(jobId);
+
+ String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ flinkInfo.setEndpoint(sortUrl);
+
+ FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
managerFlinkTask.genPath(flinkInfo,dataFlow.toString());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index cd4bb89ee..abca65e64 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -92,12 +92,15 @@ public class StartupSortListener implements SortOperateListener {
FlinkInfo flinkInfo = new FlinkInfo();
parseDataflow(dataFlow, flinkInfo);
+ String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ flinkInfo.setEndpoint(sortUrl);
+
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
flinkInfo.setJobName(jobName);
flinkInfo.setInlongStreamInfoList(groupResourceProcessForm.getInlongStreamInfoList());
- FlinkService flinkService = new FlinkService();
+ FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
managerFlinkTask.genPath(flinkInfo, dataFlow.toString());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 498b6be9d..9c4675549 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -66,15 +67,17 @@ public class SuspendSortListener implements SortOperateListener {
Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
new TypeReference<Map<String, String>>(){});
kvConf.putAll(result);
- if (StringUtils.isEmpty(kvConf.get(InlongGroupSettings.SORT_JOB_ID))) {
- String message = String.format("inlongGroupId:%s not add suspendProcess listener,SORT_JOB_ID is empty",
- inlongGroupId);
- log.warn(message);
- return ListenerResult.fail(message);
- }
+
FlinkInfo flinkInfo = new FlinkInfo();
- flinkInfo.setJobId(kvConf.get(InlongGroupSettings.SORT_JOB_ID));
- FlinkService flinkService = new FlinkService();
+
+ String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ Preconditions.checkNotEmpty(jobId, "sort jobId is empty");
+ flinkInfo.setJobId(jobId);
+
+ String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ flinkInfo.setEndpoint(sortUrl);
+
+ FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
try {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 098f3c037..7b0384e44 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.plugin.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
-import org.apache.inlong.manager.plugin.flink.dto.LoginConf;
+import org.apache.commons.lang3.StringUtils;
import java.io.BufferedWriter;
import java.io.File;
@@ -37,7 +37,6 @@ import java.util.regex.Pattern;
@Slf4j
public class FlinkUtils {
public static final String BASE_DIRECTORY = "config";
- private static final Pattern numberPattern = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+)\\:(\\d+)");
/**
*/
@@ -106,6 +105,16 @@ public class FlinkUtils {
return null;
}
+ /**
+ * get value
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public static String getValue(String key, String defaultValue) {
+ return StringUtils.isNotEmpty(key) ? key : defaultValue;
+ }
+
/**
* @param name
* @return
@@ -139,27 +148,6 @@ public class FlinkUtils {
return true;
}
- /**
- * sort_url to address&port
- * @param endpoint
- * @return
- */
- @Deprecated
- public static LoginConf translateFromEndpont(String endpoint) {
- LoginConf loginConf = new LoginConf();
- try {
- Matcher matcher = numberPattern.matcher(endpoint);
- while (matcher.find()) {
- loginConf.setRestAddress(matcher.group(1));
- loginConf.setRestPort(Integer.valueOf(matcher.group(2)));
- return loginConf;
- }
- } catch (Exception e) {
- log.error("fetch addres:port fail", e.getMessage());
- }
- return loginConf;
- }
-
/**
* delete configuration file
* @param name
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
index e7133e2ca..94008dcba 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
@@ -58,7 +58,7 @@ public class DeleteSortListenerTest {
InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
- inlongGroupExtInfo5.setKeyValue("209f30999d381d669378b476a4db59e9");
+ inlongGroupExtInfo5.setKeyValue("d7e613fb18876f173ec5ba17465fae64");
inlongGroupExtInfos.add(inlongGroupExtInfo5);
InlongGroupExtInfo inlongGroupExtInfo6 = new InlongGroupExtInfo();
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
index 749970d7e..6ad03e1ec 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
@@ -43,7 +43,7 @@ public class RestartSortListenerTest {
InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
inlongGroupExtInfo1.setKeyName(InlongGroupSettings.SORT_URL);
- inlongGroupExtInfo1.setKeyValue("127.0.0.1:8085");
+ inlongGroupExtInfo1.setKeyValue("127.0.0.1:8081");
List<InlongGroupExtInfo> inlongGroupExtInfoList = new ArrayList<>();
inlongGroupExtInfoList.add(inlongGroupExtInfo1);
@@ -57,7 +57,7 @@ public class RestartSortListenerTest {
InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
- inlongGroupExtInfo5.setKeyValue("52c8977bd28365cdd23ad4ea4cd767a4");
+ inlongGroupExtInfo5.setKeyValue("efdc85a977e72e0d9a99170d78f03ddb");
inlongGroupExtInfoList.add(inlongGroupExtInfo5);
InlongGroupExtInfo inlongGroupExtInfo6 = new InlongGroupExtInfo();
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
index bbbfd10ea..32f7f88ef 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
@@ -57,7 +57,7 @@ public class SuspendSortListenerTest {
InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
- inlongGroupExtInfo5.setKeyValue("ea405ab424cfc35ae9be93df8ea87917");
+ inlongGroupExtInfo5.setKeyValue("118a77173079c7264f08f178da4473fa");
inlongGroupExtInfos.add(inlongGroupExtInfo5);
inlongGroupInfo.setExtList(inlongGroupExtInfos);