You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/15 18:44:51 UTC
[gobblin] branch master updated: [GOBBLIN-1468] reset
currentAttempt counter on resume flow
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1e03329 [GOBBLIN-1468] reset currentAttempt counter on resume flow
1e03329 is described below
commit 1e03329332a14277b7507e75127ffa5b30377757
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Jun 15 11:44:42 2021 -0700
[GOBBLIN-1468] reset currentAttempt counter on resume flow
Closes #3308 from
arjun4084346/resetCurrentAttemptOnFlowResume
---
.../org/apache/gobblin/service/modules/orchestration/DagManager.java | 2 ++
.../apache/gobblin/service/modules/orchestration/DagManagerTest.java | 4 ++++
2 files changed, 6 insertions(+)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 5c6dd1b..362dd0d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -545,6 +545,8 @@ public class DagManager extends AbstractIdleService {
ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
if (executionStatus.equals(FAILED) || executionStatus.equals(CANCELLED)) {
node.getValue().setExecutionStatus(PENDING_RESUME);
+ // reset currentAttempts because we do not want to count previous execution's attempts in deciding whether to retry a job
+ node.getValue().setCurrentAttempts(0);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 40c17df..2b7044f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -199,6 +199,7 @@ public class DagManagerTest {
Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+ Assert.assertEquals(this.dags.get(dagId).getNodes().get(0).getValue().getCurrentAttempts(), 1);
//Run the thread 2nd time. Ensure the job0 is complete and job1 and job2 are submitted.
this._dagManagerThread.run();
@@ -412,6 +413,9 @@ public class DagManagerTest {
Assert.assertFalse(this.failedDagIds.contains(dagId));
Assert.assertTrue(this.dags.containsKey(dagId));
+ // Verify the current attempt number
+ Assert.assertEquals(dag.getNodes().get(2).getValue().getCurrentAttempts(), 1);
+
// Job2 complete
this._dagManagerThread.run();
Assert.assertFalse(this.failedDagIds.contains(dagId));