You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/15 18:44:51 UTC

[gobblin] branch master updated: [GOBBLIN-1468] reset currentAttempt counter on resume flow

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e03329  [GOBBLIN-1468] reset currentAttempt counter on resume flow
1e03329 is described below

commit 1e03329332a14277b7507e75127ffa5b30377757
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Jun 15 11:44:42 2021 -0700

    [GOBBLIN-1468] reset currentAttempt counter on resume flow
    
    Closes #3308 from
    arjun4084346/resetCurrentAttemptOnFlowResume
---
 .../org/apache/gobblin/service/modules/orchestration/DagManager.java  | 2 ++
 .../apache/gobblin/service/modules/orchestration/DagManagerTest.java  | 4 ++++
 2 files changed, 6 insertions(+)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 5c6dd1b..362dd0d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -545,6 +545,8 @@ public class DagManager extends AbstractIdleService {
         ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
         if (executionStatus.equals(FAILED) || executionStatus.equals(CANCELLED)) {
           node.getValue().setExecutionStatus(PENDING_RESUME);
+          // reset currentAttempts because we do not want to count previous execution's attempts in deciding whether to retry a job
+          node.getValue().setCurrentAttempts(0);
           Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
           this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
         }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 40c17df..2b7044f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -199,6 +199,7 @@ public class DagManagerTest {
     Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
     Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
     Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+    Assert.assertEquals(this.dags.get(dagId).getNodes().get(0).getValue().getCurrentAttempts(), 1);
 
     //Run the thread 2nd time. Ensure the job0 is complete and job1 and job2 are submitted.
     this._dagManagerThread.run();
@@ -412,6 +413,9 @@ public class DagManagerTest {
     Assert.assertFalse(this.failedDagIds.contains(dagId));
     Assert.assertTrue(this.dags.containsKey(dagId));
 
+    // Verify the current attempt number
+    Assert.assertEquals(dag.getNodes().get(2).getValue().getCurrentAttempts(), 1);
+
     // Job2 complete
     this._dagManagerThread.run();
     Assert.assertFalse(this.failedDagIds.contains(dagId));