You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/12 03:47:12 UTC
[incubator-doris] 07/09: [Feature] Clean up old sync jobs regularly
(#7061)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit b1bc9f878f54e8f40c7599de1c307ae8c1e3bfeb
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Fri Nov 12 10:53:50 2021 +0800
[Feature] Clean up old sync jobs regularly (#7061)
#7060
#6287
Each job that has been stopped for more than 3 days(set with Config.label_keep_max_second)
will be permanently cleaned up.
---
.../org/apache/doris/load/sync/SyncChecker.java | 8 ++++-
.../java/org/apache/doris/load/sync/SyncJob.java | 17 ++++++++++
.../org/apache/doris/load/sync/SyncJobManager.java | 38 ++++++++++++++++++++++
.../apache/doris/load/sync/SyncJobManagerTest.java | 36 ++++++++++++++++++++
4 files changed, 98 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
index 422fe48..8b64f34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
@@ -32,7 +32,7 @@ import java.util.List;
public class SyncChecker extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(SyncChecker.class);
- private SyncJobManager syncJobManager;
+ private final SyncJobManager syncJobManager;
public SyncChecker(SyncJobManager syncJobManager) {
super("sync checker", Config.sync_checker_interval_second * 1000L);
@@ -44,6 +44,7 @@ public class SyncChecker extends MasterDaemon {
LOG.debug("start check sync jobs.");
try {
process();
+ cleanOldSyncJobs();
} catch (Throwable e) {
LOG.warn("Failed to process one round of SyncChecker", e);
}
@@ -74,4 +75,9 @@ public class SyncChecker extends MasterDaemon {
}
}
}
+
+ private void cleanOldSyncJobs() {
+ // clean up expired sync jobs
+ this.syncJobManager.cleanOldSyncJobs();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
index c0b0321..3565c04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.CreateDataSyncJobStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
@@ -266,6 +267,18 @@ public abstract class SyncJob implements Writable {
return "\\N";
}
+ public boolean isExpired(long currentTimeMs) {
+ if (!isCompleted()) {
+ return false;
+ }
+ Preconditions.checkState(finishTimeMs != -1L);
+ long expireTime = Config.label_keep_max_second * 1000L;
+ if ((currentTimeMs - finishTimeMs) > expireTime) {
+ return true;
+ }
+ return false;
+ }
+
// only use for persist when job state changed
public static class SyncJobUpdateStateInfo implements Writable {
@SerializedName(value = "id")
@@ -450,4 +463,8 @@ public abstract class SyncJob implements Writable {
public List<ChannelDescription> getChannelDescriptions() {
return this.channelDescriptions;
}
+
+ public long getFinishTimeMs() {
+ return finishTimeMs;
+ }
}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
index 1a0d8b8..5ac5049 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
@@ -41,6 +41,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -279,6 +280,43 @@ public class SyncJobManager implements Writable {
}
}
+ // Remove old sync jobs. Called periodically.
+ // Stopped jobs will be removed after Config.label_keep_max_second.
+ public void cleanOldSyncJobs() {
+ LOG.debug("begin to clean old sync jobs ");
+ long currentTimeMs = System.currentTimeMillis();
+ writeLock();
+ try {
+ Iterator<Map.Entry<Long, SyncJob>> iterator = idToSyncJob.entrySet().iterator();
+ while (iterator.hasNext()) {
+ SyncJob syncJob = iterator.next().getValue();
+ if (syncJob.isExpired(currentTimeMs)) {
+ if (!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) {
+ continue;
+ }
+ Map<String, List<SyncJob>> map = dbIdToJobNameToSyncJobs.get(syncJob.getDbId());
+ List<SyncJob> list = map.get(syncJob.getJobName());
+ list.remove(syncJob);
+ if (list.isEmpty()) {
+ map.remove(syncJob.getJobName());
+ }
+ if (map.isEmpty()) {
+ dbIdToJobNameToSyncJobs.remove(syncJob.getDbId());
+ }
+ iterator.remove();
+ LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId())
+ .add("finishTimeMs", syncJob.getFinishTimeMs())
+ .add("currentTimeMs", currentTimeMs)
+ .add("jobState", syncJob.getJobState())
+ .add("msg", "old sync job has been cleaned")
+ );
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
public SyncJob getSyncJobById(long jobId) {
return idToSyncJob.get(jobId);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
index 8fa080f..457fe43 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
@@ -405,5 +405,41 @@ public class SyncJobManagerTest {
Assert.assertEquals(MsgType.USER_CANCEL, canalSyncJob.getFailMsg().getMsgType());
}
+ @Test
+ public void testCleanOldSyncJobs() {
+ SyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ // change sync job state to cancelled
+ try {
+ canalSyncJob.updateState(JobState.CANCELLED, false);
+ } catch (UserException e) {
+ Assert.fail();
+ }
+ Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+
+ SyncJobManager manager = new SyncJobManager();
+
+ // add a sync job to manager
+ Map<Long, SyncJob> idToSyncJob = Maps.newHashMap();
+ idToSyncJob.put(jobId, canalSyncJob);
+ Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+ Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap();
+ jobNameToSyncJobs.put(jobName, Lists.newArrayList(canalSyncJob));
+ dbIdToJobNameToSyncJobs.put(dbId, jobNameToSyncJobs);
+
+ Deencapsulation.setField(manager, "idToSyncJob", idToSyncJob);
+ Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+
+ new Expectations(canalSyncJob) {
+ {
+ canalSyncJob.isExpired(anyLong);
+ result = true;
+ }
+ };
+ manager.cleanOldSyncJobs();
+
+ Assert.assertEquals(0, idToSyncJob.size());
+ Assert.assertEquals(0, dbIdToJobNameToSyncJobs.size());
+ }
+
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org