You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/25 11:39:15 UTC

[incubator-inlong] branch master updated: [INLONG-2688][Agent] Agent support freeze and restart task when needed (#2695)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 956b8db  [INLONG-2688][Agent] Agent support freeze and restart task when needed (#2695)
956b8db is described below

commit 956b8db1554ded413e39a756fa22c7bf3de983e9
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Fri Feb 25 19:39:09 2022 +0800

    [INLONG-2688][Agent] Agent support freeze and restart task when needed (#2695)
---
 .../apache/inlong/agent/core/job/JobManager.java   | 19 +++++++++++++--
 .../agent/plugin/fetcher/ManagerFetcher.java       | 28 +++++++++++++++++++++-
 .../apache/inlong/common/enums/ManagerOpEnum.java  |  3 ++-
 3 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 6792611..1cee7a9 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -128,13 +128,26 @@ public class JobManager extends AbstractDaemon {
      * @param profile - job profile.
      */
     public boolean submitFileJobProfile(JobProfile profile) {
+        return submitJobProfile(profile, false);
+    }
+
+    /**
+     * add file job profile
+     *
+     * @param profile - job profile.
+     */
+    public boolean submitJobProfile(JobProfile profile, boolean singleJob) {
         if (profile == null || !profile.allRequiredKeyExist()) {
             LOGGER.error("profile is null or not all required key exists {}", profile == null ? null
                     : profile.toJsonStr());
             return false;
         }
         String jobId = profile.get(JOB_ID);
-        profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, jobId, index.incrementAndGet()));
+        if (singleJob) {
+            profile.set(JOB_INSTANCE_ID, jobId);
+        } else {
+            profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, jobId, index.incrementAndGet()));
+        }
         LOGGER.info("submit job profile {}", profile.toJsonStr());
         getJobConfDb().storeJobFirstTime(profile);
         addJob(new Job(profile));
@@ -164,13 +177,15 @@ public class JobManager extends AbstractDaemon {
      *
      * @param jobInstancId
      */
-    public void deleteJob(String jobInstancId) {
+    public boolean deleteJob(String jobInstancId) {
         JobWrapper jobWrapper = jobs.remove(jobInstancId);
         if (jobWrapper != null) {
             LOGGER.info("delete job instance with job id {}", jobInstancId);
             jobWrapper.cleanup();
             getJobConfDb().deleteJob(jobInstancId);
+            return true;
         }
+        return false;
     }
 
     /**
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 78638c5..489f4ae 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -87,6 +87,7 @@ import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_LOCAL_IP
 import static org.apache.inlong.agent.constant.FetcherConstants.VERSION;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_OP;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
 import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
 import static org.apache.inlong.agent.plugin.utils.PluginUtils.copyJobProfile;
 
@@ -274,7 +275,11 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
 
         for (DataConfig dataConfig : taskResult.getDataConfigs()) {
             TriggerProfile profile = TriggerProfile.getTriggerProfiles(dataConfig);
-            dealWithTdmTriggerProfile(profile);
+            if (profile.hasKey(JOB_TRIGGER)) {
+                dealWithTdmTriggerProfile(profile);
+            } else {
+                dealWithJobProfile(profile);
+            }
         }
 
         for (CmdConfig cmdConfig : taskResult.getCmdConfigs()) {
@@ -394,6 +399,27 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
     }
 
     /**
+     *
+     * @param triggerProfile
+     */
+    public void dealWithJobProfile(TriggerProfile triggerProfile) {
+        ManagerOpEnum opType = ManagerOpEnum.getOpType(triggerProfile.getInt(JOB_OP));
+        boolean success = false;
+        switch (requireNonNull(opType)) {
+            case ACTIVE:
+            case ADD:
+                success = agentManager.getJobManager().submitJobProfile(triggerProfile, true);
+                break;
+            case DEL:
+            case FROZEN:
+                success = agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId());
+                break;
+            default:
+        }
+        commandDb.saveNormalCmds(triggerProfile, success);
+    }
+
+    /**
      * check agent ip from manager
      */
     private void fetchLocalIp() {
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java
index 0f98dfb..c63371f 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java
@@ -20,7 +20,8 @@ package org.apache.inlong.common.enums;
 import static java.util.Objects.requireNonNull;
 
 public enum ManagerOpEnum {
-    ADD(0), DEL(1), RETRY(2), BACKTRACK(3), FROZEN(4), ACTIVE(5), CHECK(6), REDOMETRIC(7), MAKEUP(8);
+    ADD(0), DEL(1), RETRY(2), BACKTRACK(3), FROZEN(4),
+    ACTIVE(5), CHECK(6), REDOMETRIC(7), MAKEUP(8);
 
     private int type;