You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/23 20:48:10 UTC

[gobblin] branch master updated: [GOBBLIN-1478] Fix concurrency issue when iterating failedDagIds

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e24f20  [GOBBLIN-1478] Fix concurrency issue when iterating failedDagIds
7e24f20 is described below

commit 7e24f20a4dcaa00d0c55ebaf81316fafbd2f96d5
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Wed Jun 23 13:48:05 2021 -0700

    [GOBBLIN-1478] Fix concurrency issue when iterating failedDagIds
    
    Closes #3318 from jack-moseley/failed-dag-id-fix
---
 .../gobblin/service/modules/orchestration/DagManager.java  | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 362dd0d..630ec1d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -1275,20 +1275,18 @@ public class DagManager extends AbstractIdleService {
       try {
         log.info("Cleaning failed dag state store");
         long startTime = System.currentTimeMillis();
-        List<String> dagIdsToClean = new ArrayList<>();
+        int numCleaned = 0;
 
-        for (String dagId : this.failedDagIds) {
+        Set<String> failedDagIdsCopy = new HashSet<>(this.failedDagIds);
+        for (String dagId : failedDagIdsCopy) {
           if (this.failedDagRetentionTime > 0L && startTime > DagManagerUtils.getFlowExecId(dagId) + this.failedDagRetentionTime) {
             this.failedDagStateStore.cleanUp(dagId);
-            dagIdsToClean.add(dagId);
+            this.failedDagIds.remove(dagId);
+            numCleaned++;
           }
         }
 
-        for (String dagId : dagIdsToClean) {
-          this.failedDagIds.remove(dagId);
-        }
-
-      log.info("Cleaned " + dagIdsToClean.size() + " dags from the failed dag state store");
+      log.info("Cleaned " + numCleaned + " dags from the failed dag state store");
       } catch (Exception e) {
         log.error("Failed to run retention on failed dag state store", e);
       }