You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/23 20:48:10 UTC
[gobblin] branch master updated: [GOBBLIN-1478] Fix concurrency
issue when iterating failedDagIds
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7e24f20 [GOBBLIN-1478] Fix concurrency issue when iterating failedDagIds
7e24f20 is described below
commit 7e24f20a4dcaa00d0c55ebaf81316fafbd2f96d5
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Wed Jun 23 13:48:05 2021 -0700
[GOBBLIN-1478] Fix concurrency issue when iterating failedDagIds
Closes #3318 from jack-moseley/failed-dag-id-fix
---
.../gobblin/service/modules/orchestration/DagManager.java | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 362dd0d..630ec1d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -1275,20 +1275,18 @@ public class DagManager extends AbstractIdleService {
try {
log.info("Cleaning failed dag state store");
long startTime = System.currentTimeMillis();
- List<String> dagIdsToClean = new ArrayList<>();
+ int numCleaned = 0;
- for (String dagId : this.failedDagIds) {
+ Set<String> failedDagIdsCopy = new HashSet<>(this.failedDagIds);
+ for (String dagId : failedDagIdsCopy) {
if (this.failedDagRetentionTime > 0L && startTime > DagManagerUtils.getFlowExecId(dagId) + this.failedDagRetentionTime) {
this.failedDagStateStore.cleanUp(dagId);
- dagIdsToClean.add(dagId);
+ this.failedDagIds.remove(dagId);
+ numCleaned++;
}
}
- for (String dagId : dagIdsToClean) {
- this.failedDagIds.remove(dagId);
- }
-
- log.info("Cleaned " + dagIdsToClean.size() + " dags from the failed dag state store");
+ log.info("Cleaned " + numCleaned + " dags from the failed dag state store");
} catch (Exception e) {
log.error("Failed to run retention on failed dag state store", e);
}