You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/25 11:39:15 UTC
[incubator-inlong] branch master updated: [INLONG-2688][Agent] Agent support freeze and restart task when needed (#2695)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 956b8db [INLONG-2688][Agent] Agent support freeze and restart task when needed (#2695)
956b8db is described below
commit 956b8db1554ded413e39a756fa22c7bf3de983e9
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Fri Feb 25 19:39:09 2022 +0800
[INLONG-2688][Agent] Agent support freeze and restart task when needed (#2695)
---
.../apache/inlong/agent/core/job/JobManager.java | 19 +++++++++++++--
.../agent/plugin/fetcher/ManagerFetcher.java | 28 +++++++++++++++++++++-
.../apache/inlong/common/enums/ManagerOpEnum.java | 3 ++-
3 files changed, 46 insertions(+), 4 deletions(-)
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 6792611..1cee7a9 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -128,13 +128,26 @@ public class JobManager extends AbstractDaemon {
* @param profile - job profile.
*/
public boolean submitFileJobProfile(JobProfile profile) {
+ return submitJobProfile(profile, false);
+ }
+
+ /**
+ * add file job profile
+ *
+ * @param profile - job profile.
+ */
+ public boolean submitJobProfile(JobProfile profile, boolean singleJob) {
if (profile == null || !profile.allRequiredKeyExist()) {
LOGGER.error("profile is null or not all required key exists {}", profile == null ? null
: profile.toJsonStr());
return false;
}
String jobId = profile.get(JOB_ID);
- profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, jobId, index.incrementAndGet()));
+ if (singleJob) {
+ profile.set(JOB_INSTANCE_ID, jobId);
+ } else {
+ profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, jobId, index.incrementAndGet()));
+ }
LOGGER.info("submit job profile {}", profile.toJsonStr());
getJobConfDb().storeJobFirstTime(profile);
addJob(new Job(profile));
@@ -164,13 +177,15 @@ public class JobManager extends AbstractDaemon {
*
* @param jobInstancId
*/
- public void deleteJob(String jobInstancId) {
+ public boolean deleteJob(String jobInstancId) {
JobWrapper jobWrapper = jobs.remove(jobInstancId);
if (jobWrapper != null) {
LOGGER.info("delete job instance with job id {}", jobInstancId);
jobWrapper.cleanup();
getJobConfDb().deleteJob(jobInstancId);
+ return true;
}
+ return false;
}
/**
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 78638c5..489f4ae 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -87,6 +87,7 @@ import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_LOCAL_IP
import static org.apache.inlong.agent.constant.FetcherConstants.VERSION;
import static org.apache.inlong.agent.constant.JobConstants.JOB_OP;
import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
import static org.apache.inlong.agent.plugin.utils.PluginUtils.copyJobProfile;
@@ -274,7 +275,11 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
for (DataConfig dataConfig : taskResult.getDataConfigs()) {
TriggerProfile profile = TriggerProfile.getTriggerProfiles(dataConfig);
- dealWithTdmTriggerProfile(profile);
+ if (profile.hasKey(JOB_TRIGGER)) {
+ dealWithTdmTriggerProfile(profile);
+ } else {
+ dealWithJobProfile(profile);
+ }
}
for (CmdConfig cmdConfig : taskResult.getCmdConfigs()) {
@@ -394,6 +399,27 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
}
/**
+ *
+ * @param triggerProfile
+ */
+ public void dealWithJobProfile(TriggerProfile triggerProfile) {
+ ManagerOpEnum opType = ManagerOpEnum.getOpType(triggerProfile.getInt(JOB_OP));
+ boolean success = false;
+ switch (requireNonNull(opType)) {
+ case ACTIVE:
+ case ADD:
+ success = agentManager.getJobManager().submitJobProfile(triggerProfile, true);
+ break;
+ case DEL:
+ case FROZEN:
+ success = agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId());
+ break;
+ default:
+ }
+ commandDb.saveNormalCmds(triggerProfile, success);
+ }
+
+ /**
* check agent ip from manager
*/
private void fetchLocalIp() {
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java
index 0f98dfb..c63371f 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java
@@ -20,7 +20,8 @@ package org.apache.inlong.common.enums;
import static java.util.Objects.requireNonNull;
public enum ManagerOpEnum {
- ADD(0), DEL(1), RETRY(2), BACKTRACK(3), FROZEN(4), ACTIVE(5), CHECK(6), REDOMETRIC(7), MAKEUP(8);
+ ADD(0), DEL(1), RETRY(2), BACKTRACK(3), FROZEN(4),
+ ACTIVE(5), CHECK(6), REDOMETRIC(7), MAKEUP(8);
private int type;