You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/15 08:52:34 UTC

[inlong] branch master updated: [INLONG-5054][Agent] Fix Agent can not import old job after reboot due to inconsistent prefix (#5067)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f6b3539ec [INLONG-5054][Agent] Fix Agent can not import old job after reboot due to inconsistent prefix (#5067)
f6b3539ec is described below

commit f6b3539eca18ce8fbc9e51c028c7f1a9431ca59b
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Fri Jul 15 16:52:28 2022 +0800

    [INLONG-5054][Agent] Fix Agent can not import old job after reboot due to inconsistent prefix (#5067)
---
 .../org/apache/inlong/agent/utils/AgentUtils.java  |  7 +++++
 .../org/apache/inlong/agent/db/TestRocksDbImp.java | 30 +++++++++++++++++-----
 .../agent-common/src/test/resources/binlogJob.json | 23 +++++++++++++++++
 .../apache/inlong/agent/core/conf/ConfigJetty.java |  4 +--
 .../apache/inlong/agent/core/job/JobManager.java   | 18 +------------
 .../agent/plugin/fetcher/ManagerFetcher.java       |  2 +-
 6 files changed, 57 insertions(+), 27 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index a2c8da2e3..c640e12e4 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -197,6 +197,13 @@ public class AgentUtils {
         return prefix + currentTime + "_" + id + "_" + index;
     }
 
+    /**
+     * Get job id, such as "job_1"
+     */
+    public static String getSingleJobId(String prefix, String id) {
+        return prefix + id;
+    }
+
     /**
      * Sleep millisecond
      */
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
index d9814470f..f47e022ff 100644
--- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
@@ -17,15 +17,22 @@
 
 package org.apache.inlong.agent.db;
 
-import java.io.IOException;
-import java.util.List;
 import org.apache.inlong.agent.AgentBaseTestsHelper;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.db.CommandEntity;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
+
 public class TestRocksDbImp {
 
     private static RocksDbImp db;
@@ -37,6 +44,12 @@ public class TestRocksDbImp {
         db = new RocksDbImp();
     }
 
+    @AfterClass
+    public static void teardown() throws IOException {
+        db.close();
+        helper.teardownAgentHome();
+    }
+
     @Test
     public void testKeyValueDB() {
         KeyValueEntity entity = new KeyValueEntity("test1", "testA", "test");
@@ -102,10 +115,15 @@ public class TestRocksDbImp {
         Assert.assertEquals("searchKey1", entityResult.getKey());
     }
 
-    @AfterClass
-    public static void teardown() throws IOException {
-        db.close();
-        helper.teardownAgentHome();
+    @Test
+    public void testBinlogJobStore() {
+        JobProfile jobProfile = JobProfile.parseJsonFile("binlogJob.json");
+        JobProfileDb jobDb = new JobProfileDb(db);
+        String jobId = jobProfile.get(JOB_ID);
+        jobProfile.set(JOB_INSTANCE_ID, AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId));
+        jobDb.storeJobFirstTime(jobProfile);
+        List<JobProfile> restarts = jobDb.getRestartJobs();
+        Assert.assertEquals(1, restarts.size());
     }
 
 }
diff --git a/inlong-agent/agent-common/src/test/resources/binlogJob.json b/inlong-agent/agent-common/src/test/resources/binlogJob.json
new file mode 100644
index 000000000..db05b7e5e
--- /dev/null
+++ b/inlong-agent/agent-common/src/test/resources/binlogJob.json
@@ -0,0 +1,23 @@
+{
+  "job.version": 1,
+  "proxy.manager.host": "0.0.0.0",
+  "proxy.inlongStreamId": "test_binlog",
+  "job.binlogJob.password": "123456",
+  "job.binlogJob.hostname": "0.0.0.0",
+  "job.binlogJob.tableWhiteList": "[\\s\\S]*.*",
+  "job.id": "5",
+  "job.binlogJob.user": "root",
+  "proxy.manager.port": "8000",
+  "job.binlogJob.port": "3306",
+  "job.channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
+  "job.binlogJob.serverTimezone": "UTC+8",
+  "proxy.inlongGroupId": "test_binlog",
+  "job.sink": "org.apache.inlong.agent.plugin.sinks.ProxySink",
+  "job.binlogJob.snapshot.mode": "initial",
+  "job.binlogJob.offset.intervalMs": "1000",
+  "job.binlogJob.history.filename": "/data/.history",
+  "job.op": "0",
+  "proxy.sync": false,
+  "job.source": "org.apache.inlong.agent.plugin.sources.BinlogSource",
+  "job.binlogJob.databaseWhiteList": "[\\s\\S]*.*"
+}
\ No newline at end of file
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
index 541acd666..e5b35148d 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
@@ -92,14 +92,12 @@ public class ConfigJetty implements Closeable {
                 TaskTypeEnum taskType = TaskTypeEnum
                         .getTaskType(jobProfile.getInt(JOB_SOURCE_TYPE));
                 switch (taskType) {
-                    case SQL:
-                        jobManager.submitSqlJobProfile(jobProfile);
-                        break;
                     case FILE:
                         jobManager.submitFileJobProfile(jobProfile);
                         break;
                     case KAFKA:
                     case BINLOG:
+                    case SQL:
                         jobManager.submitJobProfile(jobProfile, true);
                         break;
                     default:
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 67fb4149c..1c3a3d6f9 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -147,7 +147,7 @@ public class JobManager extends AbstractDaemon {
         }
         String jobId = profile.get(JOB_ID);
         if (singleJob) {
-            profile.set(JOB_INSTANCE_ID, jobId);
+            profile.set(JOB_INSTANCE_ID, AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId));
         } else {
             profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, jobId, index.incrementAndGet()));
         }
@@ -157,22 +157,6 @@ public class JobManager extends AbstractDaemon {
         return true;
     }
 
-    /**
-     * add sql job profile
-     *
-     * @param profile job profile.
-     */
-    public boolean submitSqlJobProfile(JobProfile profile) {
-        if (isJobValid(profile)) {
-            return false;
-        }
-        profile.set(JOB_INSTANCE_ID, SQL_JOB_ID);
-        LOGGER.info("submit job profile {}", profile.toJsonStr());
-        getJobConfDb().storeJobFirstTime(profile);
-        addJob(new Job(profile));
-        return true;
-    }
-
     private boolean isJobValid(JobProfile profile) {
         if (profile == null || !profile.allRequiredKeyExist()) {
             LOGGER.error("profile is null or not all required key exists {}", profile == null ? null
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 3b0d4f840..c84860929 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -272,7 +272,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
         if (profile == null) {
             return;
         }
-        agentManager.getJobManager().submitSqlJobProfile(profile);
+        agentManager.getJobManager().submitJobProfile(profile, true);
     }
 
     /**