You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/06/15 22:04:19 UTC

[gobblin] branch master updated: Fix running counts for retried flows (#3520)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5684ed76b Fix running counts for retried flows (#3520)
5684ed76b is described below

commit 5684ed76b5054f7727ed4187e5711f962ceabac6
Author: William Lo <lo...@gmail.com>
AuthorDate: Wed Jun 15 15:04:14 2022 -0700

    Fix running counts for retried flows (#3520)
---
 .../apache/gobblin/service/modules/orchestration/DagManager.java | 3 ++-
 .../gobblin/service/modules/orchestration/DagManagerTest.java    | 9 +++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f57169553..dcfbe9cdf 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -963,7 +963,8 @@ public class DagManager extends AbstractIdleService {
         // By this point the quota is allocated, so it's imperative to increment as missing would introduce the potential to decrement below zero upon quota release.
         // Quota release is guaranteed, despite failure, because exception handling within would mark the job FAILED.
         // When the ensuing kafka message spurs DagManager processing, the quota is released and the counts decremented
-        if (this.metricContext != null) {
+        // Ensure that we do not double increment for flows that are retried
+        if (this.metricContext != null && dagNode.getValue().getCurrentAttempts() == 1) {
           getRunningJobsCounter(dagNode).inc();
           getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
         }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c1b0b80ef..216e28525 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -990,10 +990,19 @@ public class DagManagerTest {
 
     // Dag1 is running
     this._dagManagerThread.run();
+    SortedMap<String, Counter> allCounters = metricContext.getParent().get().getCounters();
+    Assert.assertEquals(allCounters.get(MetricRegistry.name(
+        ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ServiceMetricNames.SERVICE_USERS,
+        "user")).getCount(), 1);
     // Dag1 fails and is orchestrated again
     this._dagManagerThread.run();
     // Dag1 is running again
     this._dagManagerThread.run();
+    Assert.assertEquals(allCounters.get(MetricRegistry.name(
+        ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ServiceMetricNames.SERVICE_USERS,
+        "user")).getCount(), 1);
     // Dag1 is marked as complete, should be able to run the next Dag without hitting the quota limit
     this._dagManagerThread.run();