You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/15 08:52:34 UTC
[inlong] branch master updated: [INLONG-5054][Agent] Fix Agent can not import old job after reboot due to inconsistent prefix (#5067)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f6b3539ec [INLONG-5054][Agent] Fix Agent can not import old job after reboot due to inconsistent prefix (#5067)
f6b3539ec is described below
commit f6b3539eca18ce8fbc9e51c028c7f1a9431ca59b
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Fri Jul 15 16:52:28 2022 +0800
[INLONG-5054][Agent] Fix Agent can not import old job after reboot due to inconsistent prefix (#5067)
---
.../org/apache/inlong/agent/utils/AgentUtils.java | 7 +++++
.../org/apache/inlong/agent/db/TestRocksDbImp.java | 30 +++++++++++++++++-----
.../agent-common/src/test/resources/binlogJob.json | 23 +++++++++++++++++
.../apache/inlong/agent/core/conf/ConfigJetty.java | 4 +--
.../apache/inlong/agent/core/job/JobManager.java | 18 +------------
.../agent/plugin/fetcher/ManagerFetcher.java | 2 +-
6 files changed, 57 insertions(+), 27 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index a2c8da2e3..c640e12e4 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -197,6 +197,13 @@ public class AgentUtils {
return prefix + currentTime + "_" + id + "_" + index;
}
+ /**
+ * Get job id, such as "job_1"
+ */
+ public static String getSingleJobId(String prefix, String id) {
+ return prefix + id;
+ }
+
/**
* Sleep millisecond
*/
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
index d9814470f..f47e022ff 100644
--- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
@@ -17,15 +17,22 @@
package org.apache.inlong.agent.db;
-import java.io.IOException;
-import java.util.List;
import org.apache.inlong.agent.AgentBaseTestsHelper;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.db.CommandEntity;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
+
public class TestRocksDbImp {
private static RocksDbImp db;
@@ -37,6 +44,12 @@ public class TestRocksDbImp {
db = new RocksDbImp();
}
+ @AfterClass
+ public static void teardown() throws IOException {
+ db.close();
+ helper.teardownAgentHome();
+ }
+
@Test
public void testKeyValueDB() {
KeyValueEntity entity = new KeyValueEntity("test1", "testA", "test");
@@ -102,10 +115,15 @@ public class TestRocksDbImp {
Assert.assertEquals("searchKey1", entityResult.getKey());
}
- @AfterClass
- public static void teardown() throws IOException {
- db.close();
- helper.teardownAgentHome();
+ @Test
+ public void testBinlogJobStore() {
+ JobProfile jobProfile = JobProfile.parseJsonFile("binlogJob.json");
+ JobProfileDb jobDb = new JobProfileDb(db);
+ String jobId = jobProfile.get(JOB_ID);
+ jobProfile.set(JOB_INSTANCE_ID, AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId));
+ jobDb.storeJobFirstTime(jobProfile);
+ List<JobProfile> restarts = jobDb.getRestartJobs();
+ Assert.assertEquals(1, restarts.size());
}
}
diff --git a/inlong-agent/agent-common/src/test/resources/binlogJob.json b/inlong-agent/agent-common/src/test/resources/binlogJob.json
new file mode 100644
index 000000000..db05b7e5e
--- /dev/null
+++ b/inlong-agent/agent-common/src/test/resources/binlogJob.json
@@ -0,0 +1,23 @@
+{
+ "job.version": 1,
+ "proxy.manager.host": "0.0.0.0",
+ "proxy.inlongStreamId": "test_binlog",
+ "job.binlogJob.password": "123456",
+ "job.binlogJob.hostname": "0.0.0.0",
+ "job.binlogJob.tableWhiteList": "[\\s\\S]*.*",
+ "job.id": "5",
+ "job.binlogJob.user": "root",
+ "proxy.manager.port": "8000",
+ "job.binlogJob.port": "3306",
+ "job.channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
+ "job.binlogJob.serverTimezone": "UTC+8",
+ "proxy.inlongGroupId": "test_binlog",
+ "job.sink": "org.apache.inlong.agent.plugin.sinks.ProxySink",
+ "job.binlogJob.snapshot.mode": "initial",
+ "job.binlogJob.offset.intervalMs": "1000",
+ "job.binlogJob.history.filename": "/data/.history",
+ "job.op": "0",
+ "proxy.sync": false,
+ "job.source": "org.apache.inlong.agent.plugin.sources.BinlogSource",
+ "job.binlogJob.databaseWhiteList": "[\\s\\S]*.*"
+}
\ No newline at end of file
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
index 541acd666..e5b35148d 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
@@ -92,14 +92,12 @@ public class ConfigJetty implements Closeable {
TaskTypeEnum taskType = TaskTypeEnum
.getTaskType(jobProfile.getInt(JOB_SOURCE_TYPE));
switch (taskType) {
- case SQL:
- jobManager.submitSqlJobProfile(jobProfile);
- break;
case FILE:
jobManager.submitFileJobProfile(jobProfile);
break;
case KAFKA:
case BINLOG:
+ case SQL:
jobManager.submitJobProfile(jobProfile, true);
break;
default:
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 67fb4149c..1c3a3d6f9 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -147,7 +147,7 @@ public class JobManager extends AbstractDaemon {
}
String jobId = profile.get(JOB_ID);
if (singleJob) {
- profile.set(JOB_INSTANCE_ID, jobId);
+ profile.set(JOB_INSTANCE_ID, AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId));
} else {
profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, jobId, index.incrementAndGet()));
}
@@ -157,22 +157,6 @@ public class JobManager extends AbstractDaemon {
return true;
}
- /**
- * add sql job profile
- *
- * @param profile job profile.
- */
- public boolean submitSqlJobProfile(JobProfile profile) {
- if (isJobValid(profile)) {
- return false;
- }
- profile.set(JOB_INSTANCE_ID, SQL_JOB_ID);
- LOGGER.info("submit job profile {}", profile.toJsonStr());
- getJobConfDb().storeJobFirstTime(profile);
- addJob(new Job(profile));
- return true;
- }
-
private boolean isJobValid(JobProfile profile) {
if (profile == null || !profile.allRequiredKeyExist()) {
LOGGER.error("profile is null or not all required key exists {}", profile == null ? null
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 3b0d4f840..c84860929 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -272,7 +272,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
if (profile == null) {
return;
}
- agentManager.getJobManager().submitSqlJobProfile(profile);
+ agentManager.getJobManager().submitJobProfile(profile, true);
}
/**