You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/15 15:25:06 UTC
[incubator-inlong] branch master updated: [INLONG-3737][Manager] Workflow should fail when the plugin occurred exception (#3741)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b219a4974 [INLONG-3737][Manager] Workflow should fail when the plugin occurred exception (#3741)
b219a4974 is described below
commit b219a4974268aef0f7f15af80144a6afaa1b5b58
Author: healchow <he...@gmail.com>
AuthorDate: Fri Apr 15 23:24:59 2022 +0800
[INLONG-3737][Manager] Workflow should fail when the plugin occurred exception (#3741)
---
.../inlong/manager/common/util/JsonUtils.java | 31 +--
.../manager/plugin/flink/FlinkOperation.java | 228 ++++++++++++++++++++
.../inlong/manager/plugin/flink/FlinkService.java | 221 +++++++++-----------
...nTaskRunner.java => IntegrationTaskRunner.java} | 43 ++--
.../manager/plugin/flink/ManagerFlinkTask.java | 231 ---------------------
.../manager/plugin/flink/dto/JarEntryInfo.java | 2 +-
.../{JarRunRequestbody.java => JarRunRequest.java} | 4 +-
.../inlong/manager/plugin/flink/dto/LoginConf.java | 2 +-
...uestBody.java => StopWithSavepointRequest.java} | 2 +-
.../plugin/flink/enums/BusinessExceptionDesc.java | 37 ----
.../plugin/flink/{ => enums}/Constants.java | 2 +-
.../plugin/listener/DeleteSortListener.java | 67 +++---
.../plugin/listener/RestartSortListener.java | 89 ++++----
.../plugin/listener/StartupSortListener.java | 120 +++++------
.../plugin/listener/SuspendSortListener.java | 66 +++---
.../manager/plugin/util/FlinkConfiguration.java | 82 ++++----
.../inlong/manager/plugin/util/FlinkUtils.java | 39 +---
.../plugin/listener/DeleteSortListenerTest.java | 2 +-
.../workflow/util/WorkflowFormParserUtils.java | 4 +-
19 files changed, 606 insertions(+), 666 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
index 1d0dbb269..5aa6bccb7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
@@ -26,25 +26,26 @@ import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.exceptions.JsonException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
/**
* JSON utils
*/
@Slf4j
public class JsonUtils {
- public static final ObjectMapper MAPPER = new ObjectMapper();
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
static {
- MAPPER.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
- MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
- MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ OBJECT_MAPPER.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
+ OBJECT_MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
/**
@@ -61,7 +62,7 @@ public class JsonUtils {
return (String) obj;
}
try {
- return MAPPER.writeValueAsString(obj);
+ return OBJECT_MAPPER.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("JSON transform error: {}", obj, e);
throw new JsonException("JSON transform error");
@@ -78,7 +79,7 @@ public class JsonUtils {
*/
public static <T> T parse(String json, TypeReference<T> type) {
try {
- return MAPPER.readValue(json, type);
+ return OBJECT_MAPPER.readValue(json, type);
} catch (IOException e) {
log.error("JSON transform error: {}", json, e);
throw new JsonException("JSON transform error");
@@ -96,7 +97,7 @@ public class JsonUtils {
return null;
}
try {
- return MAPPER.readTree(json);
+ return OBJECT_MAPPER.readTree(json);
} catch (IOException e) {
log.error("JSON transform error: {}", json, e);
throw new JsonException("JSON transform error");
@@ -113,7 +114,7 @@ public class JsonUtils {
*/
public static <T> T parse(String json, Class<T> tClass) {
try {
- return MAPPER.readValue(json, tClass);
+ return OBJECT_MAPPER.readValue(json, tClass);
} catch (IOException e) {
log.error("JSON transform error: {}", json, e);
throw new JsonException("JSON transform error");
@@ -122,7 +123,7 @@ public class JsonUtils {
public static <T> T parse(String json, JavaType javaType) {
try {
- return MAPPER.readValue(json, javaType);
+ return OBJECT_MAPPER.readValue(json, javaType);
} catch (IOException e) {
log.error("JSON transform error: {}", json, e);
throw new JsonException("JSON transform error");
@@ -139,7 +140,8 @@ public class JsonUtils {
*/
public static <E> List<E> parseList(String json, Class<E> eClass) {
try {
- return MAPPER.readValue(json, MAPPER.getTypeFactory().constructCollectionType(List.class, eClass));
+ return OBJECT_MAPPER.readValue(json,
+ OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, eClass));
} catch (IOException e) {
log.error("JSON transform error: {}", json, e);
throw new JsonException("JSON transform error");
@@ -158,7 +160,8 @@ public class JsonUtils {
*/
public static <K, V> Map<K, V> parseMap(String json, Class<K> kClass, Class<V> vClass) {
try {
- return MAPPER.readValue(json, MAPPER.getTypeFactory().constructMapType(Map.class, kClass, vClass));
+ return OBJECT_MAPPER.readValue(json,
+ OBJECT_MAPPER.getTypeFactory().constructMapType(Map.class, kClass, vClass));
} catch (IOException e) {
log.error("JSON transform error: {}", json, e);
throw new JsonException("JSON transform error");
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
new file mode 100644
index 000000000..c061f5590
--- /dev/null
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
+import org.apache.inlong.manager.plugin.util.FlinkUtils;
+
+import java.io.File;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static org.apache.inlong.manager.plugin.util.FlinkUtils.findFiles;
+
+/**
+ * Flink task operation.
+ */
+@Slf4j
+public class FlinkOperation {
+
+ private static final String JOB_TERMINATED_MSG = "the job not found by id %s, "
+ + "or task already terminated or savepoint path is null";
+ private static final String INLONG_MANAGER = "inlong-manager";
+ private static final String INLONG_SORT = "inlong-sort";
+ private static final String SORT_JAR_PATTERN = "^sort-single-tenant.*jar$";
+
+ private final FlinkService flinkService;
+
+ public FlinkOperation(FlinkService flinkService) {
+ this.flinkService = flinkService;
+ }
+
+ /**
+ * Start the Flink job, if the job id was not empty, restore it.
+ */
+ public void start(FlinkInfo flinkInfo) throws Exception {
+ String jobId = flinkInfo.getJobId();
+ try {
+ // Start a new task without savepoint
+ if (StringUtils.isEmpty(jobId)) {
+ IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkService, flinkInfo,
+ TaskCommitType.START_NOW.getCode());
+ Future<?> future = TaskRunService.submit(taskRunner);
+ future.get();
+ } else {
+ // Restore an old task with savepoint
+ boolean noSavepoint = isNullOrTerminated(jobId) || StringUtils.isEmpty(flinkInfo.getSavepointPath());
+ if (noSavepoint) {
+ String message = String.format("restore job failed, as " + JOB_TERMINATED_MSG, jobId);
+ log.error(message);
+ throw new Exception(message);
+ }
+
+ IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkService, flinkInfo,
+ TaskCommitType.RESUME.getCode());
+ Future<?> future = TaskRunService.submit(taskRunner);
+ future.get();
+ }
+ } catch (Exception e) {
+ log.warn("submit flink job failed for {}", flinkInfo, e);
+ throw new Exception("submit flink job failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Build Flink local path.
+ */
+ public void genPath(FlinkInfo flinkInfo, String dataflow) throws Exception {
+ String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+ log.info("gen path from {}", path);
+
+ int index = path.indexOf(INLONG_MANAGER);
+ if (index == -1) {
+ throw new Exception(INLONG_MANAGER + " path not found in " + path);
+ }
+
+ path = path.substring(0, path.lastIndexOf(File.separator));
+ String startPath = path.substring(0, index);
+ String basePath = startPath + INLONG_SORT;
+ File file = new File(basePath);
+ if (!file.exists()) {
+ String message = String.format("file path [%s] not found", basePath);
+ log.error(message);
+ throw new Exception(message);
+ }
+
+ String jarPath = findFiles(basePath, SORT_JAR_PATTERN);
+ flinkInfo.setLocalJarPath(jarPath);
+ log.info("get sort jar path success, path: {}", jarPath);
+
+ if (FlinkUtils.writeConfigToFile(path, flinkInfo.getJobName(), dataflow)) {
+ flinkInfo.setLocalConfPath(path + File.separator + flinkInfo.getJobName());
+ } else {
+ String message = String.format("write dataflow to %s failed", path);
+ log.error(message + ", dataflow: {}", dataflow);
+ throw new Exception(message);
+ }
+ }
+
+ /**
+ * Restart the Flink job.
+ */
+ public void restart(FlinkInfo flinkInfo) throws Exception {
+ String jobId = flinkInfo.getJobId();
+ boolean terminated = isNullOrTerminated(jobId);
+ if (terminated) {
+ String message = String.format("restart job failed, as " + JOB_TERMINATED_MSG, jobId);
+ log.error(message);
+ throw new Exception(message);
+ }
+
+ Future<?> future = TaskRunService.submit(new IntegrationTaskRunner(flinkService, flinkInfo,
+ TaskCommitType.RESTART.getCode()));
+ future.get();
+ }
+
+ /**
+ * Stop the Flink job.
+ */
+ public void stop(FlinkInfo flinkInfo) throws Exception {
+ String jobId = flinkInfo.getJobId();
+ boolean terminated = isNullOrTerminated(jobId);
+ if (terminated) {
+ String message = String.format("stop job failed, as " + JOB_TERMINATED_MSG, jobId);
+ log.error(message);
+ throw new Exception(message);
+ }
+
+ Future<?> future = TaskRunService.submit(
+ new IntegrationTaskRunner(flinkService, flinkInfo,
+ TaskCommitType.STOP.getCode()));
+ future.get();
+ }
+
+ public void delete(FlinkInfo flinkInfo) throws Exception {
+ String jobId = flinkInfo.getJobId();
+ JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
+ if (jobDetailsInfo == null) {
+ throw new Exception(String.format("delete job failed as the job not found for %s", jobId));
+ }
+
+ JobStatus jobStatus = jobDetailsInfo.getJobStatus();
+ if (jobStatus != null && jobStatus.isTerminalState()) {
+ String message = String.format("not support delete %s as the task was terminated", jobId);
+ message = jobStatus.isGloballyTerminalState() ? message + " globally" : " locally";
+ throw new Exception(message);
+ }
+
+ Future<?> future = TaskRunService.submit(
+ new IntegrationTaskRunner(flinkService, flinkInfo,
+ TaskCommitType.DELETE.getCode()));
+ future.get();
+ }
+
+ public void pollJobStatus(FlinkInfo flinkInfo) throws Exception {
+ if (flinkInfo.isException()) {
+ throw new BusinessException("startup failed: " + flinkInfo.getExceptionMsg());
+ }
+ String jobId = flinkInfo.getJobId();
+ if (StringUtils.isBlank(jobId)) {
+ log.error("job id cannot empty for {}", flinkInfo);
+ throw new Exception("job id cannot empty");
+ }
+
+ while (true) {
+ try {
+ JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
+ if (jobDetailsInfo == null) {
+ log.error("job detail not found by {}", jobId);
+ throw new Exception(String.format("job detail not found by %s", jobId));
+ }
+
+ JobStatus jobStatus = jobDetailsInfo.getJobStatus();
+ if (jobStatus.isTerminalState()) {
+ log.error("job was terminated for {}, exception: {}", jobId, flinkInfo.getExceptionMsg());
+ throw new Exception("job was terminated for " + jobId);
+ }
+
+ if (jobStatus == RUNNING) {
+ log.info("job status is Running for {}", jobDetailsInfo);
+ break;
+ }
+ log.info("job was not Running for {}", jobDetailsInfo);
+ TimeUnit.SECONDS.sleep(5000);
+ } catch (Exception e) {
+ log.error("poll job status error for {}, exception: ", flinkInfo, e);
+ }
+ }
+ }
+
+ /**
+ * Check whether the job was terminated by the given job id.
+ */
+ private boolean isNullOrTerminated(String jobId) throws Exception {
+ JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
+ boolean terminated = jobDetailsInfo == null || jobDetailsInfo.getJobStatus() == null;
+ if (terminated) {
+ log.warn("job detail or job status was null for [{}]", jobId);
+ return terminated;
+ }
+
+ terminated = jobDetailsInfo.getJobStatus().isTerminalState();
+ log.warn("job terminated state was [{}] for [{}]", terminated, jobDetailsInfo);
+ return terminated;
+ }
+
+}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 6dbe835e0..61c8d1c7d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -31,14 +31,15 @@ import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequestBody;
+import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
import java.io.File;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -50,219 +51,191 @@ import java.util.regex.Pattern;
@Slf4j
public class FlinkService {
- private static final Pattern numberPattern = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+)\\:(\\d+)");
+ private static final Pattern IP_PORT_PATTERN = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
private final FlinkConfig flinkConfig;
- private final Integer port;
- private final Integer jobManagerPort;
- private final String address;
private final Integer parallelism;
private final String savepointDirectory;
+ private final Configuration configuration;
- public FlinkService(String endpoint) throws IOException {
+ public FlinkService(String endpoint) throws Exception {
FlinkConfiguration flinkConfiguration = new FlinkConfiguration();
flinkConfig = flinkConfiguration.getFlinkConfig();
- jobManagerPort = flinkConfig.getJobManagerPort();
parallelism = flinkConfig.getParallelism();
savepointDirectory = flinkConfig.getSavepointDirectory();
+
+ configuration = new Configuration();
+ Integer jobManagerPort = flinkConfig.getJobManagerPort();
+ configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+ Integer port;
+ String address;
if (StringUtils.isEmpty(endpoint)) {
address = flinkConfig.getAddress();
port = flinkConfig.getPort();
} else {
- address = translateFromEndpont(endpoint).get("address");
- port = Integer.valueOf(translateFromEndpont(endpoint).get("port"));
+ Map<String, String> ipPort = translateFromEndpoint(endpoint);
+ if (ipPort.isEmpty()) {
+ throw new BusinessException("get address:port failed from endpoint " + endpoint);
+ }
+ address = ipPort.get("address");
+ port = Integer.valueOf(ipPort.get("port"));
}
+ configuration.setString(JobManagerOptions.ADDRESS, address);
+ configuration.setInteger(RestOptions.PORT, port);
}
/**
- * translate Endpont to address & port
- *
- * @param endpoint
- * @return
+ * Translate the Endpoint to address & port
*/
- private Map<String, String> translateFromEndpont(String endpoint) {
+ private Map<String, String> translateFromEndpoint(String endpoint) throws Exception {
Map<String, String> map = new HashMap<>(2);
- try {
- Matcher matcher = numberPattern.matcher(endpoint);
- while (matcher.find()) {
- map.put("address", matcher.group(1));
- map.put("port", matcher.group(2));
- return map;
- }
- } catch (Exception e) {
- log.error("fetch address:port fail: ", e);
+ Matcher matcher = IP_PORT_PATTERN.matcher(endpoint);
+ if (matcher.find()) {
+ map.put("address", matcher.group(1));
+ map.put("port", matcher.group(2));
+ return map;
+ } else {
+ throw new Exception("endpoint [" + endpoint + "] was not match address:port");
}
- return map;
}
/**
- * get flinkConfig
- * @return
+ * Get Flink config.
*/
public FlinkConfig getFlinkConfig() {
return flinkConfig;
}
/**
- * get flink Client
- * @return
- * @throws Exception
+ * Get the Flink Client.
*/
public RestClusterClient<StandaloneClusterId> getFlinkClient() throws Exception {
- Configuration configuration = initConfiguration();
- RestClusterClient<StandaloneClusterId> client =
- new RestClusterClient<StandaloneClusterId>(configuration, StandaloneClusterId.getInstance());
- return client;
-
- }
-
- /**
- * init flink-client Configuration
- * @return
- */
- public Configuration initConfiguration() {
- Configuration configuration = new Configuration();
- configuration.setString(JobManagerOptions.ADDRESS, address);
- configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);
- configuration.setInteger(RestOptions.PORT, port);
- return configuration;
-
+ try {
+ return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
+ } catch (Exception e) {
+ log.error("get flink client failed: ", e);
+ throw new Exception("get flink client failed: " + e.getMessage());
+ }
}
/**
- * get job status
- * @return
+ * Get the job status by the given job id.
*/
- public JobStatus getJobStatus(String jobId) {
+ public JobStatus getJobStatus(String jobId) throws Exception {
try {
RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
CompletableFuture<JobStatus> jobStatus = client.getJobStatus(jobID);
return jobStatus.get();
} catch (Exception e) {
- log.error("get job status error: ", e);
+ log.error("get job status by jobId={} failed: ", jobId, e);
+ throw new Exception("get job status by jobId=" + jobId + " failed: " + e.getMessage());
}
- return null;
}
/**
- * get job detail
- * @return
+ * Get job detail by the given job id.
*/
- public JobDetailsInfo getJobDetail(String jobId) {
+ public JobDetailsInfo getJobDetail(String jobId) throws Exception {
try {
RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
CompletableFuture<JobDetailsInfo> jobDetails = client.getJobDetails(jobID);
return jobDetails.get();
} catch (Exception e) {
- log.error("get job detail error: ", e);
+ log.error("get job detail by jobId={} failed: ", jobId, e);
+ throw new Exception("get job detail by jobId=" + jobId + " failed: " + e.getMessage());
}
- return null;
}
/**
- * submit job
- * @param flinkInfo
+ * Submit the Flink job.
*/
- public String submitJobs(FlinkInfo flinkInfo) {
- RestClusterClient<StandaloneClusterId> client = null;
- String localJarPath = flinkInfo.getLocalJarPath();
- String[] programArgs = genProgramArgs(flinkInfo);
+ public String submit(FlinkInfo flinkInfo) throws Exception {
try {
- client = getFlinkClient();
- Configuration configuration = initConfiguration();
- File jarFile = new File(localJarPath);
- SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
- PackagedProgram program = PackagedProgram.newBuilder()
- .setConfiguration(configuration)
- .setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
- .setJarFile(jarFile)
- .setArguments(programArgs)
- .setSavepointRestoreSettings(savepointRestoreSettings).build();
- JobGraph jobGraph =
- PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);
- CompletableFuture<JobID> result = client.submitJob(jobGraph);
- return result.get().toString();
+ SavepointRestoreSettings settings = SavepointRestoreSettings.none();
+ return submitJobBySavepoint(flinkInfo, settings);
} catch (Exception e) {
- log.error("submit job error: ", e);
+ log.error("submit job from info {} failed: ", flinkInfo, e);
+ throw new Exception("submit job failed: " + e.getMessage());
}
- return null;
}
/**
- * restore job with savepoint
- * @param flinkInfo
+ * Restore the Flink job.
*/
- public String restore(FlinkInfo flinkInfo) {
- RestClusterClient<StandaloneClusterId> client = null;
- String localJarPath = flinkInfo.getLocalJarPath();
- String[] programArgs = genProgramArgs(flinkInfo);
+ public String restore(FlinkInfo flinkInfo) throws Exception {
try {
- client = getFlinkClient();
- Configuration configuration = initConfiguration();
- File jarFile = new File(localJarPath);
if (StringUtils.isNotEmpty(flinkInfo.getSavepointPath())) {
- SavepointRestoreSettings savepointRestoreSettings =
- SavepointRestoreSettings.forPath(savepointDirectory,false);
- PackagedProgram program = PackagedProgram.newBuilder()
- .setConfiguration(configuration)
- .setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
- .setJarFile(jarFile)
- .setArguments(programArgs)
- .setSavepointRestoreSettings(savepointRestoreSettings).build();
- JobGraph jobGraph =
- PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);
- CompletableFuture<JobID> result = client.submitJob(jobGraph);
- return result.get().toString();
+ SavepointRestoreSettings settings = SavepointRestoreSettings.forPath(savepointDirectory, false);
+ return submitJobBySavepoint(flinkInfo, settings);
+ } else {
+ log.warn("skip to restore as the savepoint path was empty " + flinkInfo);
+ return null;
}
} catch (Exception e) {
- log.error("restore job error: ", e);
+ log.error("restore job from info {} failed: ", flinkInfo, e);
+ throw new Exception("restore job failed: " + e.getMessage());
}
- return null;
}
/**
- * stop job with savepoint
- * @param jobId
- * @param requestBody
- * @return
+ * Submit the job with the savepoint settings.
*/
- public String stopJobs(String jobId, StopWithSavepointRequestBody requestBody) {
+ private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings settings) throws Exception {
+ String localJarPath = flinkInfo.getLocalJarPath();
+ File jarFile = new File(localJarPath);
+ String[] programArgs = genProgramArgs(flinkInfo);
+
+ PackagedProgram program = PackagedProgram.newBuilder()
+ .setConfiguration(configuration)
+ .setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
+ .setJarFile(jarFile)
+ .setArguments(programArgs)
+ .setSavepointRestoreSettings(settings).build();
+ JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism, false);
+
+ RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+ CompletableFuture<JobID> result = client.submitJob(jobGraph);
+ return result.get().toString();
+ }
+
+ /**
+ * Stop the Flink job with the savepoint.
+ */
+ public String stopJob(String jobId, StopWithSavepointRequest request) throws Exception {
try {
RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
- CompletableFuture<String> stopResult =
- client.stopWithSavepoint(jobID,requestBody.isDrain(),requestBody.getTargetDirectory());
+ CompletableFuture<String> stopResult = client.stopWithSavepoint(jobID, request.isDrain(),
+ request.getTargetDirectory());
return stopResult.get();
} catch (Exception e) {
- log.error("stop job error: ", e);
+ log.error("stop job {} and request {} failed: ", jobId, request, e);
+ throw new Exception("stop job " + jobId + " failed: " + e.getMessage());
}
- return null;
}
/**
- * cancel job
- * @param jobId
- * @return
+ * Cancel the Flink job.
*/
- public void cancelJobs(String jobId) {
+ public void cancelJob(String jobId) throws Exception {
try {
RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
client.cancel(jobID);
} catch (Exception e) {
- log.error("cancel job error: ", e);
+ log.error("cancel job {} failed: ", jobId, e);
+ throw new Exception("cancel job " + jobId + " failed: " + e.getMessage());
}
}
/**
- * build the program of job
- * @param flinkInfo
- * @return
+ * Build the program of the Flink job.
*/
private String[] genProgramArgs(FlinkInfo flinkInfo) {
- List<String> list = new ArrayList<>();
+ List<String> list = new ArrayList<>();
list.add("-cluster-id");
list.add(flinkInfo.getJobName());
list.add("-dataflow.info.file");
@@ -271,14 +244,14 @@ public class FlinkService {
list.add(flinkInfo.getSourceType());
list.add("-sink.type");
list.add(flinkInfo.getSinkType());
- // one group one stream now
+ // TODO Support more than one stream with one group
if (flinkInfo.getInlongStreamInfoList() != null
&& !flinkInfo.getInlongStreamInfoList().isEmpty()) {
InlongStreamInfo inlongStreamInfo = flinkInfo.getInlongStreamInfoList().get(0);
list.add("-job.orderly.output");
list.add(String.valueOf(inlongStreamInfo.getSyncSend()));
}
- String[] data = list.toArray(new String[list.size()]);
- return data;
+ return list.toArray(new String[0]);
}
+
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
similarity index 80%
rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java
rename to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
index 7399933de..1350a7276 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntergrationTaskRunner.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
@@ -21,37 +21,28 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.JobStatus;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequestBody;
+import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
import static org.apache.flink.api.common.JobStatus.FINISHED;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@Slf4j
-public class IntergrationTaskRunner implements Runnable {
+public class IntegrationTaskRunner implements Runnable {
- private FlinkService flinkService;
- private FlinkInfo flinkInfo;
- private Integer commitType;
private static final Integer TRY_MAX_TIMES = 60;
private static final Integer INTERVAL = 10;
+ private final FlinkService flinkService;
+ private final FlinkInfo flinkInfo;
+ private final Integer commitType;
- public IntergrationTaskRunner(FlinkService flinkService, FlinkInfo flinkInfo,Integer commitType) {
+ public IntegrationTaskRunner(FlinkService flinkService, FlinkInfo flinkInfo, Integer commitType) {
this.flinkService = flinkService;
this.flinkInfo = flinkInfo;
this.commitType = commitType;
}
- /**
- * When an object implementing interface <code>Runnable</code> is used
- * to create a thread, starting the thread causes the object's
- * <code>run</code> method to be called in that separately executing
- * thread.
- * <p>The general contract of the method <code>run</code> is that it may
- * take any action whatsoever.
- *
- * @see Thread#run()
- */
@Override
public void run() {
TaskCommitType commitType = TaskCommitType.getInstance(this.commitType);
@@ -61,7 +52,7 @@ public class IntergrationTaskRunner implements Runnable {
switch (commitType) {
case START_NOW:
try {
- String jobId = flinkService.submitJobs(flinkInfo);
+ String jobId = flinkService.submit(flinkInfo);
flinkInfo.setJobId(jobId);
log.info("Start job {} success in backend", jobId);
} catch (Exception e) {
@@ -86,10 +77,10 @@ public class IntergrationTaskRunner implements Runnable {
break;
case RESTART:
try {
- StopWithSavepointRequestBody stopWithSavepointRequestBody = new StopWithSavepointRequestBody();
- stopWithSavepointRequestBody.setDrain(Constants.DRAIN);
- stopWithSavepointRequestBody.setTargetDirectory(Constants.SAVEPOINT_DIRECTORY);
- String location = flinkService.stopJobs(flinkInfo.getJobId(), stopWithSavepointRequestBody);
+ StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
+ stopWithSavepointRequest.setDrain(Constants.DRAIN);
+ stopWithSavepointRequest.setTargetDirectory(Constants.SAVEPOINT_DIRECTORY);
+ String location = flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
flinkInfo.setSavepointPath(location);
log.info("the jobId: {} savepoint: {} ", flinkInfo.getJobId(), location);
int times = 0;
@@ -124,11 +115,11 @@ public class IntergrationTaskRunner implements Runnable {
break;
case STOP:
try {
- StopWithSavepointRequestBody stopWithSavepointRequestBody = new StopWithSavepointRequestBody();
- stopWithSavepointRequestBody.setDrain(Constants.DRAIN);
+ StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
+ stopWithSavepointRequest.setDrain(Constants.DRAIN);
FlinkConfig flinkConfig = flinkService.getFlinkConfig();
- stopWithSavepointRequestBody.setTargetDirectory(flinkConfig.getSavepointDirectory());
- String location = flinkService.stopJobs(flinkInfo.getJobId(), stopWithSavepointRequestBody);
+ stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
+ String location = flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
flinkInfo.setSavepointPath(location);
log.info("the jobId {} savepoint: {} ", flinkInfo.getJobId(), location);
} catch (Exception e) {
@@ -141,7 +132,7 @@ public class IntergrationTaskRunner implements Runnable {
break;
case DELETE:
try {
- flinkService.cancelJobs(flinkInfo.getJobId());
+ flinkService.cancelJob(flinkInfo.getJobId());
log.info("delete job {} success in backend", flinkInfo.getJobId());
JobStatus jobStatus = flinkService.getJobStatus(flinkInfo.getJobId());
if (jobStatus.isTerminalState()) {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/ManagerFlinkTask.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/ManagerFlinkTask.java
deleted file mode 100644
index 4b9fcdc69..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/ManagerFlinkTask.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.flink;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.BusinessExceptionDesc;
-import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
-import org.apache.inlong.manager.plugin.util.FlinkUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.api.common.JobStatus.RUNNING;
-import static org.apache.inlong.manager.plugin.util.FlinkUtils.findFiles;
-
-/**
- * flink operation
- */
-@Slf4j
-public class ManagerFlinkTask {
-
- private FlinkService flinkService;
-
- public ManagerFlinkTask(FlinkService flinkService) {
- this.flinkService = flinkService;
- }
-
- public void start(FlinkInfo flinkInfo) throws IOException {
- String jobId = flinkInfo.getJobId();
- //Start a new task without savepoint
- if (StringUtils.isEmpty(jobId)) {
- try {
- Future<?> future = TaskRunService.submit(
- new IntergrationTaskRunner(flinkService, flinkInfo,
- TaskCommitType.START_NOW.getCode()));
- future.get();
- } catch (Exception e) {
- log.warn("Flink job some exception [{}]", e);
- throw new BusinessException(BusinessExceptionDesc.UnsupportedOperation
- + e.getMessage());
- }
- //Restore an old task with savepoint
- } else {
- JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(flinkInfo.getJobId());
- if (jobDetailsInfo == null) {
- throw new BusinessException(BusinessExceptionDesc.ResourceNotFound
- + String.format("Flink job %s not found", flinkInfo.getJobId()));
- }
- JobStatus jobStatus = jobDetailsInfo.getJobStatus();
- if (!jobStatus.isTerminalState() && StringUtils.isNotEmpty(flinkInfo.getSavepointPath())) {
- try {
- Future<?> future = TaskRunService.submit(
- new IntergrationTaskRunner(flinkService, flinkInfo,
- TaskCommitType.RESUME.getCode()));
- future.get();
- } catch (Exception e) {
- log.warn("Flink job some exception [{}]", e);
- throw new BusinessException(BusinessExceptionDesc.UnsupportedOperation
- + e.getMessage());
- }
- } else {
- throw new BusinessException(BusinessExceptionDesc.UnsupportedOperation
- + "not support resume when task has been terminaled or SavepointPath is null ");
- }
- }
- }
-
- /**
- * genPath
- * @param flinkInfo
- * @param dataflow
- */
- public void genPath(FlinkInfo flinkInfo, String dataflow) {
- String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
- path = path.substring(0, path.lastIndexOf(File.separator));
- if (path.contains("inlong-manager")) {
- String startPath = path.substring(0, path.indexOf("inlong-manager"));
- String resource = "inlong-sort";
- String basePath = startPath + resource;
- File file = new File(basePath);
- if (!file.exists()) {
- log.warn("file path:[{}] not found sort jar", basePath);
- throw new BusinessException(BusinessExceptionDesc.InternalError + " not found inlong-sort");
- }
- String jarPath = findFiles(basePath,"^sort-single-tenant.*jar$");
- log.info("sort-single-tenant path :{}",jarPath);
- flinkInfo.setLocalJarPath(jarPath);
- } else {
- throw new BusinessException(BusinessExceptionDesc.InternalError + " inlong-manager dic not found");
- }
- if (FlinkUtils.writeConfigToFile(path, flinkInfo.getJobName(), dataflow)) {
- flinkInfo.setLocalConfPath(path + File.separator + flinkInfo.getJobName());
- } else {
- throw new BusinessException(BusinessExceptionDesc.InternalError + " write file fail");
- }
- }
-
- /**
- * restart flinkjob
- * @param flinkInfo
- * @throws Exception
- * @throws IOException
- */
- public void restart(FlinkInfo flinkInfo) throws Exception, IOException {
- JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(flinkInfo.getJobId());
- if (jobDetailsInfo == null) {
- throw new BusinessException(BusinessExceptionDesc.ResourceNotFound
- + String.format("Flink job %s not found", flinkInfo.getJobId()));
- }
- JobStatus jobStatus = jobDetailsInfo.getJobStatus();
- if (!jobStatus.isTerminalState()) {
- Future<?> future = TaskRunService.submit(
- new IntergrationTaskRunner(flinkService, flinkInfo,
- TaskCommitType.RESTART.getCode()));
- future.get();
- } else {
- throw new BusinessException(BusinessExceptionDesc.FailedOperation.getMessage()
- + String.format("Flink job %s restart fail", flinkInfo.getJobId()));
- }
- }
-
- /**
- * stop flinkjob
- * @param flinkInfo
- * @throws Exception
- */
- public void stop(FlinkInfo flinkInfo) throws Exception {
- JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(flinkInfo.getJobId());
- if (jobDetailsInfo == null) {
- throw new BusinessException(BusinessExceptionDesc.ResourceNotFound
- + String.format("Flink job %s not found", flinkInfo.getJobId()));
- }
- JobStatus jobStatus = jobDetailsInfo.getJobStatus();
- if (!jobStatus.isTerminalState()) {
- Future<?> future = TaskRunService.submit(
- new IntergrationTaskRunner(flinkService, flinkInfo,
- TaskCommitType.STOP.getCode()));
- future.get();
- } else {
- throw new BusinessException(BusinessExceptionDesc.FailedOperation.getMessage()
- + String.format("Flink job %s stop fail", flinkInfo.getJobId()));
- }
- }
-
- /**
- * delete flinkjob
- * @param flinkInfo
- * @throws Exception
- */
- public void delete(FlinkInfo flinkInfo) throws Exception {
- JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(flinkInfo.getJobId());
- if (jobDetailsInfo == null) {
- throw new BusinessException(BusinessExceptionDesc.ResourceNotFound
- + String.format("Flink job %s not found", flinkInfo.getJobId()));
- }
- JobStatus jobStatus = jobDetailsInfo.getJobStatus();
- if (jobStatus.isTerminalState()) {
- if (jobStatus.isGloballyTerminalState()) {
- throw new BusinessException(BusinessExceptionDesc.UnsupportedOperation
- + "not support delete when task has been terminaled globally");
- } else {
- throw new BusinessException(BusinessExceptionDesc.UnsupportedOperation
- + "not support delete when task has been terminaled locally");
- }
- } else {
- Future<?> future = TaskRunService.submit(
- new IntergrationTaskRunner(flinkService, flinkInfo,
- TaskCommitType.DELETE.getCode()));
- future.get();
- }
- }
-
- /**
- * poll status
- * @param flinkInfo
- * @throws InterruptedException
- */
- public void pollFlinkStatus(FlinkInfo flinkInfo) throws InterruptedException {
- if (flinkInfo.isException()) {
- throw new BusinessException("startup fail reason:" + flinkInfo.getExceptionMsg());
- }
- TimeUnit.SECONDS.sleep(5);
- while (true) {
- if (StringUtils.isNotEmpty(flinkInfo.getJobId())) {
- JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(flinkInfo.getJobId());
- if (jobDetailsInfo == null) {
- throw new BusinessException(BusinessExceptionDesc.ResourceNotFound
- + String.format("Flink job %s not found", flinkInfo.getJobId()));
- }
- JobStatus jobStatus = jobDetailsInfo.getJobStatus();
-
- if (jobStatus.isTerminalState()) {
- log.warn("flink job fail for status [{}]", jobStatus);
- throw new BusinessException(
- "startup fail " + jobStatus + flinkInfo.getExceptionMsg());
- } else {
- if (jobStatus == RUNNING) {
- log.info("Flink status is Running");
- break;
- } else {
- log.info("poll Flink status");
- TimeUnit.SECONDS.sleep(5);
- continue;
- }
- }
- }
- }
- }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
index 277addcf3..aedf0128b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
@@ -25,7 +25,7 @@ public class JarEntryInfo {
/**
* description
*/
- private String deacription;
+ private String description;
/**
* name
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequestbody.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
similarity index 93%
rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequestbody.java
rename to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
index 3f0729b32..ab071399b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequestbody.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
@@ -22,9 +22,9 @@ import lombok.Data;
import java.util.List;
@Data
-public class JarRunRequestbody {
+public class JarRunRequest {
- private Boolean allowNonRestoredState;
+ private Boolean allowNonRestoredState;
private String entryClass;
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
index cfbe8d083..868d84b7d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
@@ -33,7 +33,7 @@ public class LoginConf {
private Integer restPort;
/**
- * jobmanager port
+ * job manager port
*/
private Integer jobManagerPort;
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequestBody.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
similarity index 95%
rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequestBody.java
rename to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
index 3e51ee0c6..13808023c 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequestBody.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
@@ -22,7 +22,7 @@ import lombok.Setter;
@Getter
@Setter
-public class StopWithSavepointRequestBody {
+public class StopWithSavepointRequest {
/**
* tag
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/BusinessExceptionDesc.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/BusinessExceptionDesc.java
deleted file mode 100644
index 59e3f2b31..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/BusinessExceptionDesc.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.flink.enums;
-
-public enum BusinessExceptionDesc {
-
- ResourceNotFound("ResourceNotFound"),
- InternalError("InternalError"),
- FailedOperation("FailedOperation"),
- UnsupportedOperation("UnsupportedOperation"),
- ;
-
- private final String message;
-
- BusinessExceptionDesc(String message) {
- this.message = message;
- }
-
- public String getMessage() {
- return message;
- }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/Constants.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
similarity index 97%
rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/Constants.java
rename to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index 5db9084da..0a44c2bd8 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/Constants.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.plugin.flink;
+package org.apache.inlong.manager.plugin.flink.enums;
public class Constants {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index a6924ff2b..c7b1107d7 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -18,16 +18,16 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -42,6 +42,7 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
@Slf4j
public class DeleteSortListener implements SortOperateListener {
+
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
@@ -49,47 +50,59 @@ public class DeleteSortListener implements SortOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
- String inlongGroupId = context.getProcessForm().getInlongGroupId();
- ObjectMapper objectMapper = new ObjectMapper();
- UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) context.getProcessForm();
- InlongGroupInfo inlongGroupInfo = updateGroupProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> inlongGroupExtInfoList = inlongGroupInfo.getExtList();
- log.info("inlongGroupExtInfoList:{}", inlongGroupExtInfoList);
- Map<String, String> kvConf =
- inlongGroupExtInfoList.stream().collect(Collectors.toMap(InlongGroupExtInfo::getKeyName,
- InlongGroupExtInfo::getKeyValue));
+ ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof UpdateGroupProcessForm)) {
+ String message = String.format("process form was not UpdateGroup for groupId [%s]", groupId);
+ log.error(message);
+ return ListenerResult.fail(message);
+ }
+
+ UpdateGroupProcessForm updateGroupForm = (UpdateGroupProcessForm) processForm;
+ InlongGroupInfo inlongGroupInfo = updateGroupForm.getGroupInfo();
+ List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
+ log.info("inlong group ext info: {}", extList);
+
+ Map<String, String> kvConf = extList.stream().collect(
+ Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
- String message = String.format("groupId [%s] not add deleteProcess listener, "
- + "as the sortProperties is empty", inlongGroupId);
- log.warn(message);
+ String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
+ groupId);
+ log.error(message);
return ListenerResult.fail(message);
}
- Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
- new TypeReference<Map<String, String>>(){});
+
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+ new TypeReference<Map<String, String>>() {
+ });
kvConf.putAll(result);
+ String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ if (StringUtils.isBlank(jobId)) {
+ String message = String.format("sort job id is empty for groupId [%s]", groupId);
+ return ListenerResult.fail(message);
+ }
FlinkInfo flinkInfo = new FlinkInfo();
-
- String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
- Preconditions.checkNotEmpty(jobId, "sortJobId is empty");
flinkInfo.setJobId(jobId);
-
String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
-
+ FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- managerFlinkTask.delete(flinkInfo);
+ flinkOperation.delete(flinkInfo);
+ log.info("job delete success for [{}]", jobId);
+ return ListenerResult.success();
} catch (Exception e) {
- log.error("pause exception ", e);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- managerFlinkTask.pollFlinkStatus(flinkInfo);
+ flinkOperation.pollJobStatus(flinkInfo);
+
+ String message = String.format("delete sort failed for groupId [%s] ", groupId);
+ log.error(message, e);
+ return ListenerResult.fail(message + e.getMessage());
}
- return ListenerResult.success();
}
@Override
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 47c92547e..b81c72ee6 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -19,17 +19,17 @@ package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.plugin.flink.Constants;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -54,68 +54,83 @@ public class RestartSortListener implements SortOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
- String inlongGroupId = context.getProcessForm().getInlongGroupId();
- ObjectMapper objectMapper = new ObjectMapper();
- UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) context.getProcessForm();
- InlongGroupInfo inlongGroupInfo = updateGroupProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> inlongGroupExtInfos = inlongGroupInfo.getExtList();
- log.info("inlongGroupExtInfos:{}", inlongGroupExtInfos);
- Map<String, String> kvConf = inlongGroupExtInfos.stream()
- .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof UpdateGroupProcessForm)) {
+ String message = String.format("process form was not UpdateGroup for groupId [%s]", groupId);
+ log.error(message);
+ return ListenerResult.fail(message);
+ }
+
+ UpdateGroupProcessForm updateGroupForm = (UpdateGroupProcessForm) processForm;
+ InlongGroupInfo inlongGroupInfo = updateGroupForm.getGroupInfo();
+ List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
+ log.info("inlong group ext info: {}", extList);
+
+ Map<String, String> kvConf = extList.stream().collect(
+ Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
- String message = String.format("inlongGroupId:%s not add restartProcess listener,sortProperties is empty",
- inlongGroupId);
- log.warn(message);
+ String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
+ groupId);
+ log.error(message);
return ListenerResult.fail(message);
}
- Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
- new TypeReference<Map<String, String>>(){});
+
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+ new TypeReference<Map<String, String>>() {
+ });
kvConf.putAll(result);
+ String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ if (StringUtils.isBlank(jobId)) {
+ String message = String.format("sort job id is empty for groupId [%s]", groupId);
+ return ListenerResult.fail(message);
+ }
String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
if (StringUtils.isEmpty(dataFlows)) {
- String message = String.format("groupId [%s] not add restartProcess listener, "
- + "as the dataflows is empty", inlongGroupId);
- log.warn(message);
+ String message = String.format("dataflow is empty for groupId [%s]", groupId);
+ log.error(message);
return ListenerResult.fail(message);
}
- Map<String, JsonNode> dataflowMap = objectMapper.convertValue(objectMapper.readTree(dataFlows),
- new TypeReference<Map<String, JsonNode>>(){});
+
+ // TODO Support more than one dataflow in one sort job
+ Map<String, JsonNode> dataflowMap = JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
+ });
Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
JsonNode dataFlow = null;
if (dataflowOptional.isPresent()) {
dataFlow = dataflowOptional.get();
}
if (Objects.isNull(dataFlow)) {
- String message = String.format("groupId [%s] not add restartProcess listener, "
- + "as the dataflow is empty", inlongGroupId);
+ String message = String.format("dataflow is empty for groupId [%s]", groupId);
log.warn(message);
return ListenerResult.fail(message);
}
- String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
- FlinkInfo flinkInfo = new FlinkInfo();
- flinkInfo.setJobName(jobName);
- String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
- Preconditions.checkNotEmpty(jobId, "sortJobId is empty");
+ FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
-
+ String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
+ flinkInfo.setJobName(jobName);
String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
- managerFlinkTask.genPath(flinkInfo,dataFlow.toString());
-
+ FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- managerFlinkTask.restart(flinkInfo);
+ flinkOperation.genPath(flinkInfo, dataFlow.toString());
+ flinkOperation.restart(flinkInfo);
+ log.info("job restart success for [{}]", jobId);
+ return ListenerResult.success();
} catch (Exception e) {
- log.error("pause exception ", e);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- managerFlinkTask.pollFlinkStatus(flinkInfo);
+ flinkOperation.pollJobStatus(flinkInfo);
+
+ String message = String.format("restart sort failed for groupId [%s] ", groupId);
+ log.error(message, e);
+ return ListenerResult.fail(message + e.getMessage());
}
- return ListenerResult.success();
}
@Override
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 919379afa..0f74b0164 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -19,16 +19,17 @@ package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.plugin.flink.Constants;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -45,6 +46,7 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
@Slf4j
public class StartupSortListener implements SortOperateListener {
+
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
@@ -52,100 +54,98 @@ public class StartupSortListener implements SortOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
- ObjectMapper objectMapper = new ObjectMapper();
- GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) context.getProcessForm();
- InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> inlongGroupExtInfos = inlongGroupInfo.getExtList();
- log.info("inlongGroupExtInfos:{}", inlongGroupExtInfos);
- Map<String, String> kvConf = inlongGroupExtInfos.stream().filter(v -> StringUtils.isNotEmpty(v.getKeyName())
+ ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ String message = String.format("process form was not GroupResource for groupId [%s]", groupId);
+ log.error(message);
+ return ListenerResult.fail(message);
+ }
+
+ GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+ InlongGroupInfo inlongGroupInfo = groupResourceForm.getGroupInfo();
+ List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
+ log.info("inlong group ext info: {}", extList);
+
+ Map<String, String> kvConf = extList.stream().filter(v -> StringUtils.isNotEmpty(v.getKeyName())
&& StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
InlongGroupExtInfo::getKeyName,
InlongGroupExtInfo::getKeyValue));
+ String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+ new TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
+ }
+
String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
- String inlongGroupId = context.getProcessForm().getInlongGroupId();
if (StringUtils.isEmpty(dataFlows)) {
- String message = String.format("groupId [%s] not add startupProcess listener, "
- + "as the dataflows is empty", inlongGroupId);
- log.warn(message);
+ String message = String.format("dataflow is empty for groupId [%s]", groupId);
+ log.error(message);
return ListenerResult.fail(message);
}
- Map<String, JsonNode> dataflowMap = objectMapper.convertValue(objectMapper.readTree(dataFlows),
- new TypeReference<Map<String, JsonNode>>(){});
+ Map<String, JsonNode> dataflowMap = JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
+ });
Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
JsonNode dataFlow = null;
if (dataflowOptional.isPresent()) {
dataFlow = dataflowOptional.get();
}
if (Objects.isNull(dataFlow)) {
- String message = String.format("groupId [%s] not add startupProcess listener, "
- + "as the dataflow is empty", inlongGroupId);
+ String message = String.format("dataflow is empty for groupId [%s]", groupId);
log.warn(message);
- ListenerResult.fail(message);
+ return ListenerResult.fail(message);
}
- String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
- if (StringUtils.isNotEmpty(sortExt)) {
- Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
- new TypeReference<Map<String, String>>(){});
- kvConf.putAll(result);
- }
FlinkInfo flinkInfo = new FlinkInfo();
- parseDataflow(dataFlow, flinkInfo);
-
- String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
- flinkInfo.setEndpoint(sortUrl);
-
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
flinkInfo.setJobName(jobName);
-
- flinkInfo.setInlongStreamInfoList(groupResourceProcessForm.getInlongStreamInfoList());
+ String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ flinkInfo.setEndpoint(sortUrl);
+ flinkInfo.setInlongStreamInfoList(groupResourceForm.getInlongStreamInfoList());
+ parseDataflow(dataFlow, flinkInfo);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
- managerFlinkTask.genPath(flinkInfo, dataFlow.toString());
+ FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- managerFlinkTask.start(flinkInfo);
- log.info("the jobId {} submit success", flinkInfo.getJobId());
+ flinkOperation.genPath(flinkInfo, dataFlow.toString());
+ flinkOperation.start(flinkInfo);
+ log.info("job submit success, jobId is [{}]", flinkInfo.getJobId());
} catch (Exception e) {
- log.warn("startup exception: ", e);
- managerFlinkTask.pollFlinkStatus(flinkInfo);
+ // TODO why call 4 times
+ flinkOperation.pollJobStatus(flinkInfo);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- managerFlinkTask.pollFlinkStatus(flinkInfo);
- }
-
- managerFlinkTask.pollFlinkStatus(flinkInfo);
+ flinkOperation.pollJobStatus(flinkInfo);
- saveInfo(context.getProcessForm().getInlongGroupId(), InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(),
- inlongGroupExtInfos);
+ String message = String.format("startup sort failed for groupId [%s] ", groupId);
+ log.error(message, e);
+ return ListenerResult.fail(message + e.getMessage());
+ }
- managerFlinkTask.pollFlinkStatus(flinkInfo);
+ saveInfo(groupId, InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(), extList);
+ flinkOperation.pollJobStatus(flinkInfo);
return ListenerResult.success();
}
/**
- * save info
- * @param inlongGroupId
- * @param keyName
- * @param keyValue
- * @param inlongGroupExtInfos
+ * Save ext info into list.
*/
- private void saveInfo(String inlongGroupId, String keyName, String keyValue,
- List<InlongGroupExtInfo> inlongGroupExtInfos) {
- InlongGroupExtInfo inlongGroupExtInfo = new InlongGroupExtInfo();
- inlongGroupExtInfo.setInlongGroupId(inlongGroupId);
- inlongGroupExtInfo.setKeyName(keyName);
- inlongGroupExtInfo.setKeyValue(keyValue);
- inlongGroupExtInfos.add(inlongGroupExtInfo);
+ private void saveInfo(String inlongGroupId, String keyName, String keyValue, List<InlongGroupExtInfo> extInfoList) {
+ InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+ extInfo.setInlongGroupId(inlongGroupId);
+ extInfo.setKeyName(keyName);
+ extInfo.setKeyValue(keyValue);
+ extInfoList.add(extInfo);
}
/**
- * init FlinkConf
- * @param dataflow
- * @param flinkInfo
+ * Init FlinkConf
*/
- private void parseDataflow(JsonNode dataflow, FlinkInfo flinkInfo) {
+ private void parseDataflow(JsonNode dataflow, FlinkInfo flinkInfo) {
JsonNode sourceInfo = dataflow.get(Constants.SOURCE_INFO);
String sourceType = sourceInfo.get(Constants.TYPE).asText();
flinkInfo.setSourceType(sourceType);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index a642660e1..c822b828e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -18,16 +18,16 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.ManagerFlinkTask;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -42,6 +42,7 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
@Slf4j
public class SuspendSortListener implements SortOperateListener {
+
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
@@ -49,46 +50,59 @@ public class SuspendSortListener implements SortOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
- String inlongGroupId = context.getProcessForm().getInlongGroupId();
- ObjectMapper objectMapper = new ObjectMapper();
- UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) context.getProcessForm();
- InlongGroupInfo inlongGroupInfo = updateGroupProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> inlongGroupExtInfos = inlongGroupInfo.getExtList();
- log.info("inlongGroupExtInfos:{}", inlongGroupExtInfos);
- Map<String, String> kvConf = inlongGroupExtInfos.stream().collect(Collectors.toMap(
- InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof UpdateGroupProcessForm)) {
+ String message = String.format("process form was not UpdateGroup for groupId [%s]", groupId);
+ log.error(message);
+ return ListenerResult.fail(message);
+ }
+
+ UpdateGroupProcessForm updateGroupForm = (UpdateGroupProcessForm) processForm;
+ InlongGroupInfo inlongGroupInfo = updateGroupForm.getGroupInfo();
+ List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
+ log.info("inlong group ext info: {}", extList);
+
+ Map<String, String> kvConf = extList.stream().collect(
+ Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
- String message = String.format("groupId [%s] not add suspendProcess listener, "
- + "as the sortProperties is empty", inlongGroupId);
- log.warn(message);
+ String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty",
+ groupId);
+ log.error(message);
return ListenerResult.fail(message);
}
- Map<String, String> result = objectMapper.convertValue(objectMapper.readTree(sortExt),
- new TypeReference<Map<String, String>>(){});
+
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+ new TypeReference<Map<String, String>>() {
+ });
kvConf.putAll(result);
+ String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ if (StringUtils.isBlank(jobId)) {
+ String message = String.format("sort job id is empty for groupId [%s]", groupId);
+ return ListenerResult.fail(message);
+ }
FlinkInfo flinkInfo = new FlinkInfo();
-
- String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
- Preconditions.checkNotEmpty(jobId, "sortJobId is empty");
flinkInfo.setJobId(jobId);
-
String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- ManagerFlinkTask managerFlinkTask = new ManagerFlinkTask(flinkService);
-
+ FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- managerFlinkTask.stop(flinkInfo);
+ flinkOperation.stop(flinkInfo);
+ log.info("job suspend success for [{}]", jobId);
+ return ListenerResult.success();
} catch (Exception e) {
- log.error("pause exception ", e);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- managerFlinkTask.pollFlinkStatus(flinkInfo);
+ flinkOperation.pollJobStatus(flinkInfo);
+
+ String message = String.format("suspend sort failed for groupId [%s] ", groupId);
+ log.error(message, e);
+ return ListenerResult.fail(message + e.getMessage());
}
- return ListenerResult.success();
}
@Override
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
index 5092739f3..db91d5271 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
@@ -17,10 +17,7 @@
package org.apache.inlong.manager.plugin.util;
-import com.google.gson.JsonPrimitive;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
-import org.apache.inlong.manager.plugin.flink.enums.BusinessExceptionDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,66 +25,62 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
-import static org.apache.inlong.manager.plugin.flink.Constants.ADDRESS;
-import static org.apache.inlong.manager.plugin.flink.Constants.JOB_MANAGER_PORT;
-import static org.apache.inlong.manager.plugin.flink.Constants.PARALLELISM;
-import static org.apache.inlong.manager.plugin.flink.Constants.PORT;
-import static org.apache.inlong.manager.plugin.flink.Constants.SAVEPOINT_DIRECTORY;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY;
/**
- * flink configuration. Only one instance in the process.
- * Basically it use properties file to store configurations.
+ * Configuration file for Flink, only one instance in the process.
+ * Basically it used properties file to store.
*/
-public class FlinkConfiguration {
+public class FlinkConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfiguration.class);
private static final String DEFAULT_CONFIG_FILE = "flink-sort-plugin.properties";
+ private static final String INLONG_MANAGER = "inlong-manager";
- private final Map<String, JsonPrimitive> configStorage = new HashMap<>();
+ private final FlinkConfig flinkConfig;
- private FlinkConfig flinkConfig;
+ /**
+ * load config from flink file.
+ */
+ public FlinkConfiguration() throws Exception {
+ String path = formatPath();
+ flinkConfig = getFlinkConfigFromFile(path);
+ }
/**
* fetch DEFAULT_CONFIG_FILE full path
- * @return
*/
- private String formatPath() {
+ private String formatPath() throws Exception {
String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
- LOGGER.info("format first path {}",path);
- String inlongManager = "inlong-manager";
- if (path.contains(inlongManager)) {
- path = path.substring(0, path.indexOf(inlongManager));
- String confPath = path + inlongManager + File.separator + "plugins" + File.separator + DEFAULT_CONFIG_FILE;
- LOGGER.info("flink-sort-plugin.properties path : {}",confPath);
- File file = new File(confPath);
- if (!file.exists()) {
- LOGGER.warn("plugins config file path:[{}] not found flink-sort-plugin.properties", confPath);
- throw new BusinessException(BusinessExceptionDesc.InternalError
- + " not found flink-sort-plugin.properties");
- }
- return confPath;
- } else {
- throw new BusinessException(BusinessExceptionDesc.InternalError + "plugins dictionary not found ");
+ LOGGER.info("format first path {}", path);
+
+ int index = path.indexOf(INLONG_MANAGER);
+ if (index == -1) {
+ throw new Exception(INLONG_MANAGER + " path not found in " + path);
}
- }
- /**
- * load config from flink file.
- */
- public FlinkConfiguration() throws IOException {
- String path = formatPath();
- flinkConfig = getFlinkConfigFromFile(path);
+ path = path.substring(0, index);
+ String confPath = path + INLONG_MANAGER + File.separator + "plugins" + File.separator + DEFAULT_CONFIG_FILE;
+ File file = new File(confPath);
+ if (!file.exists()) {
+ String message = String.format("not found %s in path %s", DEFAULT_CONFIG_FILE, confPath);
+ LOGGER.error(message);
+ throw new Exception(message);
+ }
+ LOGGER.info("after format, {} located in {}", DEFAULT_CONFIG_FILE, confPath);
+ return confPath;
}
/**
- * get flinkcongfig
- * @return
+ * get flink config
*/
public FlinkConfig getFlinkConfig() {
return flinkConfig;
@@ -95,16 +88,11 @@ public class FlinkConfiguration {
/**
* parse properties
- * @param fileName
- * @return
- * @throws IOException
*/
- private FlinkConfig getFlinkConfigFromFile(String fileName) throws IOException {
+ private FlinkConfig getFlinkConfigFromFile(String fileName) throws IOException {
Properties properties = new Properties();
BufferedReader bufferedReader = new BufferedReader(new FileReader(fileName));
properties.load(bufferedReader);
- properties.forEach((key, value) -> configStorage.put((String) key,
- new JsonPrimitive((String) value)));
FlinkConfig flinkConfig = new FlinkConfig();
flinkConfig.setPort(Integer.valueOf(properties.getProperty(PORT)));
flinkConfig.setAddress(properties.getProperty(ADDRESS));
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 7e3654fd8..f610c652f 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -36,25 +36,23 @@ import java.util.regex.Pattern;
@Slf4j
public class FlinkUtils {
+
public static final String BASE_DIRECTORY = "config";
- public static final List<String> FLINK_VERSION_COLLECTION = Arrays.asList("Flink-1.13");
+ public static final List<String> FLINK_VERSION_COLLECTION = Collections.singletonList("Flink-1.13");
/**
* getLatestFlinkVersion
- * @param supportedFlink
- * @return
*/
- public static String getLatestFlinkVersion(String [] supportedFlink) {
+ public static String getLatestFlinkVersion(String[] supportedFlink) {
if (Objects.isNull(supportedFlink)) {
return null;
}
Arrays.sort(supportedFlink, Collections.reverseOrder());
String latestFinkVersion = null;
- for (int i = 0; i < supportedFlink.length; i++) {
- String flinkVersion = supportedFlink[i];
+ for (String flinkVersion : supportedFlink) {
latestFinkVersion = FLINK_VERSION_COLLECTION.stream()
- .filter(v -> v.equals(flinkVersion)).findFirst().orElse(null);
+ .filter(v -> v.equals(flinkVersion)).findFirst().orElse(null);
if (Objects.nonNull(latestFinkVersion)) {
return latestFinkVersion;
}
@@ -64,8 +62,6 @@ public class FlinkUtils {
/**
* print exception
- * @param throwable
- * @return
*/
public static String getExceptionStackMsg(Throwable throwable) {
StringWriter stringWriter = new StringWriter();
@@ -75,24 +71,22 @@ public class FlinkUtils {
/**
* fetch sort-single-tenant jar path
- * @param baseDirName
- * @return
*/
- public static String findFiles(String baseDirName,String pattern) {
+ public static String findFiles(String baseDirName, String pattern) {
File baseDir = new File(baseDirName);
if (!baseDir.exists() || !baseDir.isDirectory()) {
log.error("baseDirName find fail :{}", baseDirName);
return null;
}
- String tempName = null;
+ String tempName;
File tempFile;
File[] files = baseDir.listFiles();
- if (files.length == 0) {
+ if (files == null || files.length == 0) {
log.info("baseDirName is empty");
return null;
}
- for (int i = 0; i < files.length; i++) {
- tempFile = files[i];
+ for (File file : files) {
+ tempFile = file;
tempName = tempFile.getName();
Pattern jarPathPattern = Pattern.compile(pattern);
Matcher matcher = jarPathPattern.matcher(tempName);
@@ -106,9 +100,6 @@ public class FlinkUtils {
/**
* get value
- * @param key
- * @param defaultValue
- * @return
*/
public static String getValue(String key, String defaultValue) {
return StringUtils.isNotEmpty(key) ? key : defaultValue;
@@ -116,8 +107,6 @@ public class FlinkUtils {
/**
* getConfigDirectory
- * @param name
- * @return
*/
public static String getConfigDirectory(String name) {
return BASE_DIRECTORY + File.separator + name;
@@ -125,10 +114,6 @@ public class FlinkUtils {
/**
* writeConfigToFile
- * @param configJobDirectory
- * @param configFileName
- * @param content
- * @return
*/
public static boolean writeConfigToFile(String configJobDirectory, String configFileName, String content) {
File file = new File(configJobDirectory);
@@ -151,8 +136,6 @@ public class FlinkUtils {
/**
* delete configuration file
- * @param name
- * @return
*/
public static boolean deleteConfigFile(String name) {
String configDirectory = getConfigDirectory(name);
@@ -161,7 +144,7 @@ public class FlinkUtils {
try {
FileUtils.deleteDirectory(file);
} catch (IOException e) {
- log.error("delete %s failed", configDirectory);
+ log.error("delete {} failed", configDirectory, e);
return false;
}
}
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
index 94008dcba..fa150125c 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
@@ -22,7 +22,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.plugin.flink.Constants;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.junit.Test;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
index 7bdc16dd1..49d6b13a4 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
@@ -54,7 +54,7 @@ public class WorkflowFormParserUtils {
UserTask userTask = (UserTask) task;
try {
- JavaType javaType = JsonUtils.MAPPER.constructType(userTask.getFormClass());
+ JavaType javaType = JsonUtils.OBJECT_MAPPER.constructType(userTask.getFormClass());
return JsonUtils.parse(workflowTaskEntity.getFormData(), javaType);
} catch (Exception e) {
log.error("task form parse failed, form is: {}", workflowTaskEntity.getFormData(), e);
@@ -74,7 +74,7 @@ public class WorkflowFormParserUtils {
}
try {
- JavaType javaType = JsonUtils.MAPPER.constructType(process.getFormClass());
+ JavaType javaType = JsonUtils.OBJECT_MAPPER.constructType(process.getFormClass());
return JsonUtils.parse(form, javaType);
} catch (Exception e) {
log.error("process form parse failed, form is: {}", form, e);