You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/03/18 21:07:25 UTC

[GitHub] [gobblin] Will-Lo opened a new pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Will-Lo opened a new pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481


   …jobs
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1624
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if applicable):
   With the DagManager class in GaaS, during rollout/leader swap it is possible to get an inaccurate count of running jobs emitted, and quotas for these running jobs.
   
   For example, if the leader is shut down while keeping track of 10 running jobs, and during restart 5 of these jobs completed, the leader would emit that 0 jobs are currently running since it would not treat the job counters as idempotent. Additionally, we over-decrement due to not differentiating jobs running on the executor that fail, vs jobs that fail on the GaaS side.
   
   We should keep track of currently running jobs better to ensure that we only decrement counters/quotas for jobs that are actually running on the executor and track better between startup. 
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   Added tests in the DagManagerTest class, will expand tests to the QuotaManager once functionality expands
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter edited a comment on pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#issuecomment-1072834527


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3481](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7e56e65) into [master](https://codecov.io/gh/apache/gobblin/commit/8ffe72bcb9911710ba0fb9a345c605692f270493?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8ffe72b) will **increase** coverage by `0.05%`.
   > The diff coverage is `60.00%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3481      +/-   ##
   ============================================
   + Coverage     46.62%   46.67%   +0.05%     
   - Complexity    10359    10390      +31     
   ============================================
     Files          2076     2077       +1     
     Lines         81064    81128      +64     
     Branches       9049     9056       +7     
   ============================================
   + Hits          37795    37867      +72     
   + Misses        39787    39776      -11     
   - Partials       3482     3485       +3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...blin/service/modules/orchestration/DagManager.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9vcmNoZXN0cmF0aW9uL0RhZ01hbmFnZXIuamF2YQ==) | `77.29% <31.25%> (+5.99%)` | :arrow_up: |
   | [...ervice/modules/orchestration/UserQuotaManager.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9vcmNoZXN0cmF0aW9uL1VzZXJRdW90YU1hbmFnZXIuamF2YQ==) | `65.47% <65.47%> (ø)` | |
   | [...a/org/apache/gobblin/util/limiter/NoopLimiter.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvbGltaXRlci9Ob29wTGltaXRlci5qYXZh) | `40.00% <0.00%> (-20.00%)` | :arrow_down: |
   | [.../org/apache/gobblin/async/AsyncDataDispatcher.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2FzeW5jL0FzeW5jRGF0YURpc3BhdGNoZXIuamF2YQ==) | `79.66% <0.00%> (-8.48%)` | :arrow_down: |
   | [...he/gobblin/source/PartitionAwareFileRetriever.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9QYXJ0aXRpb25Bd2FyZUZpbGVSZXRyaWV2ZXIuamF2YQ==) | `48.14% <0.00%> (-7.41%)` | :arrow_down: |
   | [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `92.85% <0.00%> (-7.15%)` | :arrow_down: |
   | [...ce/modules/flowgraph/pathfinder/BFSPathFinder.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93Z3JhcGgvcGF0aGZpbmRlci9CRlNQYXRoRmluZGVyLmphdmE=) | `75.00% <0.00%> (-5.00%)` | :arrow_down: |
   | [...lin/restli/throttling/ZookeeperLeaderElection.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2Utc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3Jlc3RsaS90aHJvdHRsaW5nL1pvb2tlZXBlckxlYWRlckVsZWN0aW9uLmphdmE=) | `70.00% <0.00%> (-2.23%)` | :arrow_down: |
   | [.../gobblin/cluster/HelixRetriggeringJobCallable.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhSZXRyaWdnZXJpbmdKb2JDYWxsYWJsZS5qYXZh) | `60.24% <0.00%> (-1.05%)` | :arrow_down: |
   | [...dules/flowgraph/pathfinder/AbstractPathFinder.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93Z3JhcGgvcGF0aGZpbmRlci9BYnN0cmFjdFBhdGhGaW5kZXIuamF2YQ==) | `86.06% <0.00%> (-1.01%)` | :arrow_down: |
   | ... and [19 more](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [8ffe72b...7e56e65](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] Will-Lo commented on a change in pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on a change in pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#discussion_r837965163



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -382,12 +364,29 @@ public synchronized void setActive(boolean active) {
               ServiceMetricNames.FAILED_FLOW_METER));
         }
 
+        UserQuotaManager quotaManager = new UserQuotaManager(config);
+        // Before initializing the DagManagerThreads check which dags are currently running before shutdown
+        for (Dag<JobExecutionPlan> dag: dagStateStore.getDags()) {
+          for (DagNode<JobExecutionPlan> dagNode: dag.getNodes()) {
+            if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+              // Add all the currently running Dags to the quota limit per user
+              try {
+                quotaManager.checkQuota(dagNode);
+              } catch (IOException e) {
+                // Quota is somehow exceeded with currently running jobs, we should never hit this state normally
+                // but we should avoid stalling the entire service
+                log.error(String.format("Quota exceeded during initialization in DagManager for job name: %s",

Review comment:
       Ah I didn't think about that edge case, nice catch. Even then, it would be during startup so I don't see a reason to fail a job that's still running on Azkaban even if the quota is decreased, at most we should be logging it as a warning and reject additional jobs until the currently running jobs is under the quota.

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -382,12 +364,29 @@ public synchronized void setActive(boolean active) {
               ServiceMetricNames.FAILED_FLOW_METER));
         }
 
+        UserQuotaManager quotaManager = new UserQuotaManager(config);
+        // Before initializing the DagManagerThreads check which dags are currently running before shutdown
+        for (Dag<JobExecutionPlan> dag: dagStateStore.getDags()) {
+          for (DagNode<JobExecutionPlan> dagNode: dag.getNodes()) {
+            if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+              // Add all the currently running Dags to the quota limit per user
+              try {
+                quotaManager.checkQuota(dagNode);
+              } catch (IOException e) {
+                // Quota is somehow exceeded with currently running jobs, we should never hit this state normally
+                // but we should avoid stalling the entire service
+                log.error(String.format("Quota exceeded during initialization in DagManager for job name: %s",

Review comment:
       Will add an additional test case for this
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] phet commented on a change in pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
phet commented on a change in pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#discussion_r837052036



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -382,12 +364,29 @@ public synchronized void setActive(boolean active) {
               ServiceMetricNames.FAILED_FLOW_METER));
         }
 
+        UserQuotaManager quotaManager = new UserQuotaManager(config);
+        // Before initializing the DagManagerThreads check which dags are currently running before shutdown
+        for (Dag<JobExecutionPlan> dag: dagStateStore.getDags()) {
+          for (DagNode<JobExecutionPlan> dagNode: dag.getNodes()) {
+            if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+              // Add all the currently running Dags to the quota limit per user
+              try {
+                quotaManager.checkQuota(dagNode);
+              } catch (IOException e) {
+                // Quota is somehow exceeded with currently running jobs, we should never hit this state normally
+                // but we should avoid stalling the entire service
+                log.error(String.format("Quota exceeded during initialization in DagManager for job name: %s",

Review comment:
       couldn't this occur whenever the service has been redeployed w/ different config, where a given user's quota has been reduced?  that feels not so severe a condition, although def worth logging.  we must track all running (even when over the quota), so we accurately publish count per user (and overall)--despite whether we rely on set membership check to guard 'decrement'.  (in general this all deserves commenting for maintainers--possibly within `UserQuotaManager`.)
   
   BTW, glad to see more info captured in the log, such as the user... might be nice also to include num jobs count beyond allotment.
   
   ensuring overrun gracefully transitions to reduced allotment probably merits specific test case.

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Manages the statically configured user quotas for both the proxy user in user.to.proxy configuration and the API requester(s)
+ * Is used by the dag manager to ensure that the number of currently running jobs do not exceed the quota, if the quota
+ * is exceeded, then the execution will fail without running on the underlying executor
+ */
+@Slf4j
+public class UserQuotaManager {
+  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota";
+  public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String QUOTA_SEPERATOR = ":";
+  public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+  private final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
+  private final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
+  private final Map<String, Integer> perUserQuota;
+  Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
+  private final int defaultQuota;
+
+  UserQuotaManager(Config config) {
+    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
+    ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
+
+    for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
+      mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    this.perUserQuota = mapBuilder.build();
+  }
+
+  /**
+   * Checks if the dagNode exceeds the statically configured user quota for both the proxy user and requester user
+   * @throws IOException if the quota is exceeded, and logs a statement
+   */
+  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+    boolean proxyUserCheck = true;
+    int proxyQuotaIncrement;
+    Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased
+    StringBuilder requesterMessage = new StringBuilder();
+    runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
+
+    if (proxyUser != null) {
+      proxyQuotaIncrement = incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check succeeds
+      if (!proxyUserCheck) {
+        // add 1 to proxyUserIncrement since count starts at 0, and is negative if quota is exceeded
+        requesterMessage.append(String.format(
+            "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
+            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+      }
+    }
+
+    String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+    boolean requesterCheck = true;
+
+    if (serializedRequesters != null) {
+      List<String> uniqueRequesters = RequesterService.deserialize(serializedRequesters).stream()
+          .map(ServiceRequester::getName).distinct().collect(Collectors.toList());
+      for (String requester : uniqueRequesters) {
+        int userQuotaIncrement = incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
+        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota check succeeds
+        usersQuotaIncrement.add(requester);
+        requesterCheck = requesterCheck && thisRequesterCheck;
+        if (!thisRequesterCheck) {
+          requesterMessage.append(String.format(
+              "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n",
+              requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
+        }
+      }
+    }
+
+    // Throw errors for reach quota at the end to avoid inconsistent job counts
+    if (!proxyUserCheck || !requesterCheck) {
+      // roll back the increased counts in this block
+      String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+      decrementQuotaUsage(proxyUserToJobCount, userKey);
+      decrementQuotaUsageForUsers(usersQuotaIncrement);
+      runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+      throw new IOException(requesterMessage.toString());
+    }
+  }
+
+  /**
+   * Increment quota by one for the given map and key.
+   * @return a negative number if quota is already reached for this user
+   *         a positive number if the quota is not reached for this user
+   *         the absolute value of the number is the used quota before this increment request
+   *         0 if quota usage is not changed
+   */
+  private int incrementJobCountAndCheckUserQuota(Map<String, Integer> quotaMap, String user, Dag.DagNode<JobExecutionPlan> dagNode) {
+    String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
+
+    // Only increment job count for first attempt, since job is considered running between retries
+    if (dagNode.getValue().getCurrentAttempts() != 1) {
+      return 0;
+    }
+
+    Integer currentCount;
+    do {
+      currentCount = quotaMap.get(key);
+    } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : !quotaMap.replace(key, currentCount, currentCount + 1));
+
+    if (currentCount == null) {
+      currentCount = 0;
+    }
+
+    if (currentCount >= getQuotaForUser(user)) {
+      return -currentCount; // increment must have crossed the quota
+    } else {
+      return currentCount;
+    }
+  }
+
+  /**
+   * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
+   * Returns true if the dag existed in the set of running dags and was removed successfully
+   */
+  public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) {
+    Boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+    if (val == null) {
+      return false;
+    }
+    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+    if (proxyUser != null) {
+      String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+      decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
+    }
+    String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+    if (serializedRequesters != null) {
+      try {
+        for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
+          String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
+          decrementQuotaUsage(requesterToJobCount, requesterKey);
+        }
+      } catch (IOException e) {
+        log.error("Failed to release quota for requester list " + serializedRequesters, e);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user) {
+    Integer currentCount;
+    if (user == null) {
+      return;
+    }
+    do {
+      currentCount = quotaMap.get(user);
+    } while (currentCount != null && currentCount > 0 && !quotaMap.replace(user, currentCount, currentCount - 1));

Review comment:
       maybe a comment, like, "mods must be thread-safe since DAGs on different DMThreads may concern the same user"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] phet commented on a change in pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
phet commented on a change in pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#discussion_r832991943



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -90,6 +87,7 @@
 import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.eclipse.jetty.util.ConcurrentHashSet;

Review comment:
       not familiar w/ this one... it may be fine, but you could always just use `j.u.c.ConcurrentHashMap` w/ `true` for the value.

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/GobblinServiceQuotaManager.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Manages the statically configured user quotas for both the proxy user in user.to.proxy configuration and the API requester(s)
+ * Is used by the dag manager to ensure that the number of currently running jobs do not exceed the quota, if the quota
+ * is exceeded, then the execution will fail without running on the underlying executor
+ */
+@Slf4j
+public class GobblinServiceQuotaManager {

Review comment:
       nit: the package already includes `gobblin.service`, so possibly no need to repeat in class name.  maybe just name what kind of quota it is.  e.g. `UserQuotaManager` or `ConcurrentJobsPerUserQuotaManager`

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -444,41 +444,42 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
     private final MetricContext metricContext;
     private final Optional<EventSubmitter> eventSubmitter;
     private final Optional<Timer> jobStatusPolledTimer;
-    private final int defaultQuota;
-    private final Map<String, Integer> perUserQuota;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
     private static final Map<String, FlowState> flowGauges = Maps.newConcurrentMap();
     private final ContextAwareMeter allSuccessfulMeter;
     private final ContextAwareMeter allFailedMeter;
     private static final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
     private static final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
-
+    private final GobblinServiceQuotaManager quotaManager;
     private final JobStatusRetriever jobStatusRetriever;
     private final DagStateStore dagStateStore;
     private final DagStateStore failedDagStateStore;
     private final BlockingQueue<Dag<JobExecutionPlan>> queue;
     private final BlockingQueue<String> cancelQueue;
     private final BlockingQueue<String> resumeQueue;
     private final Long defaultJobStartSlaTimeMillis;
+    private final Set<String> runningDags;
     /**
      * Constructor.
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
-        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> perUserQuota, Set<String> failedDagIds,
-        ContextAwareMeter allSuccessfulMeter, ContextAwareMeter allFailedMeter, Long defaultJobStartSla) {
+        boolean instrumentationEnabled, Set<String> failedDagIds, ContextAwareMeter allSuccessfulMeter,
+        ContextAwareMeter allFailedMeter, Long defaultJobStartSla, GobblinServiceQuotaManager quotaManager,
+        Set<String> runningDags) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.failedDagStateStore = failedDagStateStore;
       this.failedDagIds = failedDagIds;
       this.queue = queue;
       this.cancelQueue = cancelQueue;
       this.resumeQueue = resumeQueue;
-      this.defaultQuota = defaultQuota;
-      this.perUserQuota = perUserQuota;
       this.allSuccessfulMeter = allSuccessfulMeter;
       this.allFailedMeter = allFailedMeter;
       this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
+      this.quotaManager = quotaManager;
+      this.runningDags = runningDags;

Review comment:
       these two co-occur, which makes sense.  any reason not to encapsulate the notion of `runningDags` within the `quotaManager`?  you could still have a method there to provide the current count of running DAGs (for monitoring/metrics)

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -1086,15 +1008,17 @@ private int getQuotaForUser(String user) {
       ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
       log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
 
-      releaseQuota(dagNode);
-
-      if (this.metricContext != null) {
-        getRunningJobsCounter(dagNode).dec();
-        getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
+      // Only decrement counters and quota for jobs that actually ran on the executor, not from a GaaS side failure/skip event
+      if (this.runningDags.contains(dagId)) {
+        quotaManager.releaseQuota(dagNode);
+        runningDags.remove(dagId);

Review comment:
       this check-then-set pattern doesn't look thread-safe.  is there some additional context that ensures exactly one thread would only ever work w/ the same `dagId`.  if so, please leave a comment explaining for maintainers.

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -382,12 +365,31 @@ public synchronized void setActive(boolean active) {
               ServiceMetricNames.FAILED_FLOW_METER));
         }
 
+        GobblinServiceQuotaManager quotaManager = new GobblinServiceQuotaManager(config);
+        // Before initializing the DagManagerThreads check which dags are currently running before shutdown
+        Set<String> runningDags = new ConcurrentHashSet<>();

Review comment:
       `runningDagIds`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter edited a comment on pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#issuecomment-1072834527


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3481](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7e56e65) into [master](https://codecov.io/gh/apache/gobblin/commit/8ffe72bcb9911710ba0fb9a345c605692f270493?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8ffe72b) will **increase** coverage by `2.61%`.
   > The diff coverage is `60.00%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3481      +/-   ##
   ============================================
   + Coverage     46.62%   49.23%   +2.61%     
   + Complexity    10359     8924    -1435     
   ============================================
     Files          2076     1696     -380     
     Lines         81064    65570   -15494     
     Branches       9049     7519    -1530     
   ============================================
   - Hits          37795    32284    -5511     
   + Misses        39787    30255    -9532     
   + Partials       3482     3031     -451     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...blin/service/modules/orchestration/DagManager.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9vcmNoZXN0cmF0aW9uL0RhZ01hbmFnZXIuamF2YQ==) | `77.29% <31.25%> (+5.99%)` | :arrow_up: |
   | [...ervice/modules/orchestration/UserQuotaManager.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9vcmNoZXN0cmF0aW9uL1VzZXJRdW90YU1hbmFnZXIuamF2YQ==) | `65.47% <65.47%> (ø)` | |
   | [...a/org/apache/gobblin/util/limiter/NoopLimiter.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvbGltaXRlci9Ob29wTGltaXRlci5qYXZh) | `40.00% <0.00%> (-20.00%)` | :arrow_down: |
   | [.../org/apache/gobblin/async/AsyncDataDispatcher.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2FzeW5jL0FzeW5jRGF0YURpc3BhdGNoZXIuamF2YQ==) | `79.66% <0.00%> (-8.48%)` | :arrow_down: |
   | [...he/gobblin/source/PartitionAwareFileRetriever.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9QYXJ0aXRpb25Bd2FyZUZpbGVSZXRyaWV2ZXIuamF2YQ==) | `48.14% <0.00%> (-7.41%)` | :arrow_down: |
   | [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `92.85% <0.00%> (-7.15%)` | :arrow_down: |
   | [...ce/modules/flowgraph/pathfinder/BFSPathFinder.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93Z3JhcGgvcGF0aGZpbmRlci9CRlNQYXRoRmluZGVyLmphdmE=) | `75.00% <0.00%> (-5.00%)` | :arrow_down: |
   | [...lin/restli/throttling/ZookeeperLeaderElection.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2Utc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3Jlc3RsaS90aHJvdHRsaW5nL1pvb2tlZXBlckxlYWRlckVsZWN0aW9uLmphdmE=) | `70.00% <0.00%> (-2.23%)` | :arrow_down: |
   | [.../gobblin/cluster/HelixRetriggeringJobCallable.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhSZXRyaWdnZXJpbmdKb2JDYWxsYWJsZS5qYXZh) | `60.24% <0.00%> (-1.05%)` | :arrow_down: |
   | [...dules/flowgraph/pathfinder/AbstractPathFinder.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93Z3JhcGgvcGF0aGZpbmRlci9BYnN0cmFjdFBhdGhGaW5kZXIuamF2YQ==) | `86.06% <0.00%> (-1.01%)` | :arrow_down: |
   | ... and [400 more](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [8ffe72b...7e56e65](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#discussion_r836957917



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -382,12 +365,31 @@ public synchronized void setActive(boolean active) {
               ServiceMetricNames.FAILED_FLOW_METER));
         }
 
+        GobblinServiceQuotaManager quotaManager = new GobblinServiceQuotaManager(config);
+        // Before initializing the DagManagerThreads check which dags are currently running before shutdown
+        Set<String> runningDags = new ConcurrentHashSet<>();
+        for (Dag<JobExecutionPlan> dag: dagStateStore.getDags()) {
+          for (DagNode<JobExecutionPlan> dagNode: dag.getNodes()) {
+            if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+              runningDags.add(DagManagerUtils.generateDagId(dagNode));
+              // Add all the currently running Dags to the quota limit per user
+              try {
+                quotaManager.checkQuota(dagNode);
+              } catch (IOException e) {
+                // Quota is somehow exceeded with currently running jobs, we should never hit this state normally
+                // but we should avoid stalling the entire service
+                log.error(String.format("Quota exceeded during initialization in DagManager for job name: %s",

Review comment:
       this is exceeding quota of entire dag manager not per user quota right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter edited a comment on pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#issuecomment-1072834527


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3481](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (12ca18e) into [master](https://codecov.io/gh/apache/gobblin/commit/8ffe72bcb9911710ba0fb9a345c605692f270493?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8ffe72b) will **decrease** coverage by `3.22%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3481      +/-   ##
   ============================================
   - Coverage     46.62%   43.40%   -3.23%     
   + Complexity    10359     2031    -8328     
   ============================================
     Files          2076      405    -1671     
     Lines         81064    17414   -63650     
     Branches       9049     2127    -6922     
   ============================================
   - Hits          37795     7558   -30237     
   + Misses        39787     9019   -30768     
   + Partials       3482      837    -2645     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...a/org/apache/gobblin/util/limiter/NoopLimiter.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvbGltaXRlci9Ob29wTGltaXRlci5qYXZh) | `40.00% <0.00%> (-20.00%)` | :arrow_down: |
   | [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `85.71% <0.00%> (-14.29%)` | :arrow_down: |
   | [.../gobblin/cluster/HelixRetriggeringJobCallable.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhSZXRyaWdnZXJpbmdKb2JDYWxsYWJsZS5qYXZh) | `60.24% <0.00%> (-1.05%)` | :arrow_down: |
   | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `20.00% <0.00%> (-0.35%)` | :arrow_down: |
   | [...ache/gobblin/cluster/GobblinHelixJobScheduler.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iU2NoZWR1bGVyLmphdmE=) | `34.48% <0.00%> (ø)` | |
   | [...blin/service/modules/orchestration/DagManager.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9vcmNoZXN0cmF0aW9uL0RhZ01hbmFnZXIuamF2YQ==) | | |
   | [...e/gobblin/broker/iface/NotConfiguredException.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vYnJva2VyL2lmYWNlL05vdENvbmZpZ3VyZWRFeGNlcHRpb24uamF2YQ==) | | |
   | [.../writer/FileAwareInputStreamDataWriterBuilder.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvd3JpdGVyL0ZpbGVBd2FyZUlucHV0U3RyZWFtRGF0YVdyaXRlckJ1aWxkZXIuamF2YQ==) | | |
   | [...blin/converter/string/ObjectToStringConverter.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9zdHJpbmcvT2JqZWN0VG9TdHJpbmdDb252ZXJ0ZXIuamF2YQ==) | | |
   | [...pache/gobblin/configuration/ConfigurationKeys.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vY29uZmlndXJhdGlvbi9Db25maWd1cmF0aW9uS2V5cy5qYXZh) | | |
   | ... and [1668 more](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [8ffe72b...12ca18e](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter commented on pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#issuecomment-1072834527


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3481](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (83a67d1) into [master](https://codecov.io/gh/apache/gobblin/commit/8ffe72bcb9911710ba0fb9a345c605692f270493?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8ffe72b) will **increase** coverage by `0.05%`.
   > The diff coverage is `59.80%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3481      +/-   ##
   ============================================
   + Coverage     46.62%   46.67%   +0.05%     
   - Complexity    10359    10378      +19     
   ============================================
     Files          2076     2077       +1     
     Lines         81064    81092      +28     
     Branches       9049     9051       +2     
   ============================================
   + Hits          37795    37849      +54     
   + Misses        39787    39755      -32     
   - Partials       3482     3488       +6     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...blin/service/modules/orchestration/DagManager.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9vcmNoZXN0cmF0aW9uL0RhZ01hbmFnZXIuamF2YQ==) | `77.22% <48.00%> (+5.92%)` | :arrow_up: |
   | [...ules/orchestration/GobblinServiceQuotaManager.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9vcmNoZXN0cmF0aW9uL0dvYmJsaW5TZXJ2aWNlUXVvdGFNYW5hZ2VyLmphdmE=) | `63.63% <63.63%> (ø)` | |
   | [...a/org/apache/gobblin/util/limiter/NoopLimiter.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvbGltaXRlci9Ob29wTGltaXRlci5qYXZh) | `40.00% <0.00%> (-20.00%)` | :arrow_down: |
   | [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `92.85% <0.00%> (-7.15%)` | :arrow_down: |
   | [...lin/elasticsearch/writer/FutureCallbackHolder.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tZWxhc3RpY3NlYXJjaC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9lbGFzdGljc2VhcmNoL3dyaXRlci9GdXR1cmVDYWxsYmFja0hvbGRlci5qYXZh) | `61.42% <0.00%> (-1.43%)` | :arrow_down: |
   | [.../gobblin/cluster/HelixRetriggeringJobCallable.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhSZXRyaWdnZXJpbmdKb2JDYWxsYWJsZS5qYXZh) | `60.24% <0.00%> (-1.05%)` | :arrow_down: |
   | [...ache/gobblin/cluster/GobblinHelixJobScheduler.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iU2NoZWR1bGVyLmphdmE=) | `34.48% <0.00%> (ø)` | |
   | [.../service/monitoring/KafkaAvroJobStatusMonitor.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9LYWZrYUF2cm9Kb2JTdGF0dXNNb25pdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...ce/extractor/extract/restapi/RestApiConnector.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9yZXN0YXBpL1Jlc3RBcGlDb25uZWN0b3IuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../org/apache/gobblin/metrics/event/TimingEvent.java](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tZXRyaWNzLWxpYnMvZ29iYmxpbi1tZXRyaWNzLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0cmljcy9ldmVudC9UaW1pbmdFdmVudC5qYXZh) | `70.00% <0.00%> (ø)` | |
   | ... and [8 more](https://codecov.io/gh/apache/gobblin/pull/3481/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [8ffe72b...83a67d1](https://codecov.io/gh/apache/gobblin/pull/3481?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] Will-Lo commented on a change in pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on a change in pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#discussion_r836681064



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -1086,15 +1008,17 @@ private int getQuotaForUser(String user) {
       ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
       log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
 
-      releaseQuota(dagNode);
-
-      if (this.metricContext != null) {
-        getRunningJobsCounter(dagNode).dec();
-        getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
+      // Only decrement counters and quota for jobs that actually ran on the executor, not from a GaaS side failure/skip event
+      if (this.runningDags.contains(dagId)) {
+        quotaManager.releaseQuota(dagNode);
+        runningDags.remove(dagId);

Review comment:
       Yes only one thread would work with the same dagId, even if there are concurrent executions with the same flowname and flowgroup it will have a unique flowexecutionId that's generated from the timestamp. I'll add a comment for this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] phet commented on a change in pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
phet commented on a change in pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#discussion_r833513178



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -444,41 +444,42 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
     private final MetricContext metricContext;
     private final Optional<EventSubmitter> eventSubmitter;
     private final Optional<Timer> jobStatusPolledTimer;
-    private final int defaultQuota;
-    private final Map<String, Integer> perUserQuota;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
     private static final Map<String, FlowState> flowGauges = Maps.newConcurrentMap();
     private final ContextAwareMeter allSuccessfulMeter;
     private final ContextAwareMeter allFailedMeter;
     private static final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
     private static final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
-
+    private final GobblinServiceQuotaManager quotaManager;
     private final JobStatusRetriever jobStatusRetriever;
     private final DagStateStore dagStateStore;
     private final DagStateStore failedDagStateStore;
     private final BlockingQueue<Dag<JobExecutionPlan>> queue;
     private final BlockingQueue<String> cancelQueue;
     private final BlockingQueue<String> resumeQueue;
     private final Long defaultJobStartSlaTimeMillis;
+    private final Set<String> runningDags;
     /**
      * Constructor.
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
-        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> perUserQuota, Set<String> failedDagIds,
-        ContextAwareMeter allSuccessfulMeter, ContextAwareMeter allFailedMeter, Long defaultJobStartSla) {
+        boolean instrumentationEnabled, Set<String> failedDagIds, ContextAwareMeter allSuccessfulMeter,
+        ContextAwareMeter allFailedMeter, Long defaultJobStartSla, GobblinServiceQuotaManager quotaManager,
+        Set<String> runningDags) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.failedDagStateStore = failedDagStateStore;
       this.failedDagIds = failedDagIds;
       this.queue = queue;
       this.cancelQueue = cancelQueue;
       this.resumeQueue = resumeQueue;
-      this.defaultQuota = defaultQuota;
-      this.perUserQuota = perUserQuota;
       this.allSuccessfulMeter = allSuccessfulMeter;
       this.allFailedMeter = allFailedMeter;
       this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
+      this.quotaManager = quotaManager;
+      this.runningDags = runningDags;

Review comment:
       there may even be reason for coordination between the two, such that a more sophisticated user quota / rate limiting could take into account the overall system load (potentially to allow some overrun to accommodate temp burstiness).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] Will-Lo commented on a change in pull request #3481: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running …

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on a change in pull request #3481:
URL: https://github.com/apache/gobblin/pull/3481#discussion_r836965753



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -382,12 +365,31 @@ public synchronized void setActive(boolean active) {
               ServiceMetricNames.FAILED_FLOW_METER));
         }
 
+        GobblinServiceQuotaManager quotaManager = new GobblinServiceQuotaManager(config);
+        // Before initializing the DagManagerThreads check which dags are currently running before shutdown
+        Set<String> runningDags = new ConcurrentHashSet<>();
+        for (Dag<JobExecutionPlan> dag: dagStateStore.getDags()) {
+          for (DagNode<JobExecutionPlan> dagNode: dag.getNodes()) {
+            if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+              runningDags.add(DagManagerUtils.generateDagId(dagNode));
+              // Add all the currently running Dags to the quota limit per user
+              try {
+                quotaManager.checkQuota(dagNode);
+              } catch (IOException e) {
+                // Quota is somehow exceeded with currently running jobs, we should never hit this state normally
+                // but we should avoid stalling the entire service
+                log.error(String.format("Quota exceeded during initialization in DagManager for job name: %s",

Review comment:
       It would be a per user quota, bit since this is happening at startup we should technically never exceed the quota since it would be loading the previously running jobs, which can't be exceeding the quota.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org