You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/12 03:47:12 UTC

[incubator-doris] 07/09: [Feature] Clean up old sync jobs regularly (#7061)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit b1bc9f878f54e8f40c7599de1c307ae8c1e3bfeb
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Fri Nov 12 10:53:50 2021 +0800

    [Feature] Clean up old sync jobs regularly (#7061)
    
    #7060
    #6287
    
    Each job that has been stopped for more than 3 days(set with Config.label_keep_max_second)
    will be permanently cleaned up.
---
 .../org/apache/doris/load/sync/SyncChecker.java    |  8 ++++-
 .../java/org/apache/doris/load/sync/SyncJob.java   | 17 ++++++++++
 .../org/apache/doris/load/sync/SyncJobManager.java | 38 ++++++++++++++++++++++
 .../apache/doris/load/sync/SyncJobManagerTest.java | 36 ++++++++++++++++++++
 4 files changed, 98 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
index 422fe48..8b64f34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
@@ -32,7 +32,7 @@ import java.util.List;
 public class SyncChecker extends MasterDaemon {
     private static final Logger LOG = LogManager.getLogger(SyncChecker.class);
 
-    private SyncJobManager syncJobManager;
+    private final SyncJobManager syncJobManager;
 
     public SyncChecker(SyncJobManager syncJobManager) {
         super("sync checker", Config.sync_checker_interval_second * 1000L);
@@ -44,6 +44,7 @@ public class SyncChecker extends MasterDaemon {
         LOG.debug("start check sync jobs.");
         try {
             process();
+            cleanOldSyncJobs();
         } catch (Throwable e) {
             LOG.warn("Failed to process one round of SyncChecker", e);
         }
@@ -74,4 +75,9 @@ public class SyncChecker extends MasterDaemon {
             }
         }
     }
+
+    private void cleanOldSyncJobs() {
+        // clean up expired sync jobs
+        this.syncJobManager.cleanOldSyncJobs();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
index c0b0321..3565c04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.CreateDataSyncJobStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
@@ -266,6 +267,18 @@ public abstract class SyncJob implements Writable {
         return "\\N";
     }
 
+    public boolean isExpired(long currentTimeMs) {
+        if (!isCompleted()) {
+            return false;
+        }
+        Preconditions.checkState(finishTimeMs != -1L);
+        long expireTime = Config.label_keep_max_second * 1000L;
+        if ((currentTimeMs - finishTimeMs) > expireTime) {
+            return true;
+        }
+        return false;
+    }
+
     // only use for persist when job state changed
     public static class SyncJobUpdateStateInfo implements Writable {
         @SerializedName(value = "id")
@@ -450,4 +463,8 @@ public abstract class SyncJob implements Writable {
     public List<ChannelDescription> getChannelDescriptions() {
         return this.channelDescriptions;
     }
+
+    public long getFinishTimeMs() {
+        return finishTimeMs;
+    }
 }
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
index 1a0d8b8..5ac5049 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
@@ -41,6 +41,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -279,6 +280,43 @@ public class SyncJobManager implements Writable {
         }
     }
 
+    // Remove old sync jobs. Called periodically.
+    // Stopped jobs will be removed after Config.label_keep_max_second.
+    public void cleanOldSyncJobs() {
+        LOG.debug("begin to clean old sync jobs ");
+        long currentTimeMs = System.currentTimeMillis();
+        writeLock();
+        try {
+            Iterator<Map.Entry<Long, SyncJob>> iterator = idToSyncJob.entrySet().iterator();
+            while (iterator.hasNext()) {
+                SyncJob syncJob = iterator.next().getValue();
+                if (syncJob.isExpired(currentTimeMs)) {
+                    if (!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) {
+                        continue;
+                    }
+                    Map<String, List<SyncJob>> map = dbIdToJobNameToSyncJobs.get(syncJob.getDbId());
+                    List<SyncJob> list = map.get(syncJob.getJobName());
+                    list.remove(syncJob);
+                    if (list.isEmpty()) {
+                        map.remove(syncJob.getJobName());
+                    }
+                    if (map.isEmpty()) {
+                        dbIdToJobNameToSyncJobs.remove(syncJob.getDbId());
+                    }
+                    iterator.remove();
+                    LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId())
+                            .add("finishTimeMs", syncJob.getFinishTimeMs())
+                            .add("currentTimeMs", currentTimeMs)
+                            .add("jobState", syncJob.getJobState())
+                            .add("msg", "old sync job has been cleaned")
+                    );
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
     public SyncJob getSyncJobById(long jobId) {
         return idToSyncJob.get(jobId);
     }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
index 8fa080f..457fe43 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
@@ -405,5 +405,41 @@ public class SyncJobManagerTest {
         Assert.assertEquals(MsgType.USER_CANCEL, canalSyncJob.getFailMsg().getMsgType());
     }
 
+    @Test
+    public void testCleanOldSyncJobs() {
+        SyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+        // change sync job state to cancelled
+        try {
+            canalSyncJob.updateState(JobState.CANCELLED, false);
+        } catch (UserException e) {
+            Assert.fail();
+        }
+        Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+
+        SyncJobManager manager = new SyncJobManager();
+
+        // add a sync job to manager
+        Map<Long, SyncJob> idToSyncJob = Maps.newHashMap();
+        idToSyncJob.put(jobId, canalSyncJob);
+        Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+        Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap();
+        jobNameToSyncJobs.put(jobName, Lists.newArrayList(canalSyncJob));
+        dbIdToJobNameToSyncJobs.put(dbId, jobNameToSyncJobs);
+
+        Deencapsulation.setField(manager, "idToSyncJob", idToSyncJob);
+        Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+
+        new Expectations(canalSyncJob) {
+            {
+                canalSyncJob.isExpired(anyLong);
+                result = true;
+            }
+        };
+        manager.cleanOldSyncJobs();
+
+        Assert.assertEquals(0, idToSyncJob.size());
+        Assert.assertEquals(0, dbIdToJobNameToSyncJobs.size());
+    }
+
 
 }
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org