You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "Will-Lo (via GitHub)" <gi...@apache.org> on 2023/02/03 21:47:40 UTC

[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3635: [GOBBLIN-1778] Add house keeping thread in DagManager to periodically sync in memory state with mysql table

Will-Lo commented on code in PR #3635:
URL: https://github.com/apache/gobblin/pull/3635#discussion_r1096309836


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -272,6 +275,9 @@ protected void startUp() {
    * @param setStatus if true, set all jobs in the dag to pending
    */
   synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus) throws IOException {
+    if (!this.isActive) {
+      return;
+    }

Review Comment:
   Would this ever cause dags to be missed? If during the process where one new dag is being added and then the leader is changed



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -405,10 +411,15 @@ public synchronized void setActive(boolean active) {
         }
         FailedDagRetentionThread failedDagRetentionThread = new FailedDagRetentionThread(failedDagStateStore, failedDagIds, failedDagRetentionTime);
         this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, retentionPollingInterval, TimeUnit.MINUTES);
-        List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
-        log.info("Loading " + dags.size() + " dags from dag state store");
-        for (Dag<JobExecutionPlan> dag : dags) {
-          addDag(dag, false, false);
+        loadingDagsFromDagStateStore();
+        this.houseKeepingThreadPool = Executors.newSingleThreadScheduledExecutor();
+        for (int delay = houseKeepingThreadInitialDelay; delay < 180; delay *= 2) {

Review Comment:
   make 180 a static variable named: `MAX_HOUSEKEEPING_THREAD_DELAY` ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -441,6 +453,14 @@ public synchronized void setActive(boolean active) {
     }
   }
 
+  private void loadingDagsFromDagStateStore() throws IOException {

Review Comment:
   Tiny grammar nit: rename to loadDagFromDagStateStore? Since when it reads it's more declarative



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org