You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/06/15 22:04:19 UTC
[gobblin] branch master updated: Fix running counts for retried flows (#3520)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5684ed76b Fix running counts for retried flows (#3520)
5684ed76b is described below
commit 5684ed76b5054f7727ed4187e5711f962ceabac6
Author: William Lo <lo...@gmail.com>
AuthorDate: Wed Jun 15 15:04:14 2022 -0700
Fix running counts for retried flows (#3520)
---
.../apache/gobblin/service/modules/orchestration/DagManager.java | 3 ++-
.../gobblin/service/modules/orchestration/DagManagerTest.java | 9 +++++++++
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f57169553..dcfbe9cdf 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -963,7 +963,8 @@ public class DagManager extends AbstractIdleService {
// By this point the quota is allocated, so it's imperative to increment as missing would introduce the potential to decrement below zero upon quota release.
// Quota release is guaranteed, despite failure, because exception handling within would mark the job FAILED.
// When the ensuing kafka message spurs DagManager processing, the quota is released and the counts decremented
- if (this.metricContext != null) {
+ // Ensure that we do not double increment for flows that are retried
+ if (this.metricContext != null && dagNode.getValue().getCurrentAttempts() == 1) {
getRunningJobsCounter(dagNode).inc();
getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c1b0b80ef..216e28525 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -990,10 +990,19 @@ public class DagManagerTest {
// Dag1 is running
this._dagManagerThread.run();
+ SortedMap<String, Counter> allCounters = metricContext.getParent().get().getCounters();
+ Assert.assertEquals(allCounters.get(MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS,
+ "user")).getCount(), 1);
// Dag1 fails and is orchestrated again
this._dagManagerThread.run();
// Dag1 is running again
this._dagManagerThread.run();
+ Assert.assertEquals(allCounters.get(MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS,
+ "user")).getCount(), 1);
// Dag1 is marked as complete, should be able to run the next Dag without hitting the quota limit
this._dagManagerThread.run();