You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/11 21:42:17 UTC

[gobblin] branch master updated: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running … (#3481)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 74c4290e0 [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running … (#3481)
74c4290e0 is described below

commit 74c4290e0b0cb46be7b5820d85486978c2939b0d
Author: William Lo <lo...@gmail.com>
AuthorDate: Mon Apr 11 14:42:12 2022 -0700

    [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running … (#3481)
    
    * Refactor quota management, fix various bugs in accounting of running jobs
    
    * Add javadocs
    
    * Address comments, add metric counts to tests
    
    * Address scenario on startup where quota is decreased
    
    * rename onstartup to onInit
---
 .../service/modules/orchestration/DagManager.java  | 178 +++---------------
 .../modules/orchestration/UserQuotaManager.java    | 198 +++++++++++++++++++++
 .../modules/orchestration/DagManagerTest.java      | 153 +++++++++++++++-
 .../orchestration/UserQuotaManagerTest.java        |  71 ++++++++
 4 files changed, 438 insertions(+), 162 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index b69fb2695..2d89591e1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -39,14 +38,12 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.Subscribe;
@@ -138,15 +135,10 @@ public class DagManager extends AbstractIdleService {
   private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
   public static final String FAILED_DAG_POLLING_INTERVAL = FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
   public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
-  private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX + "defaultJobQuota";
-  private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
-  private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX + "perUserQuota";
   // Default job start SLA time if configured, measured in minutes. Default is 10 minutes
   private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
   private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
 
-  private static final String QUOTA_SEPERATOR = ":";
-
   /**
    * Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes:
    * <ul>
@@ -191,8 +183,6 @@ public class DagManager extends AbstractIdleService {
   private final JobStatusRetriever jobStatusRetriever;
   private final Config config;
   private final Optional<EventSubmitter> eventSubmitter;
-  private final int defaultQuota;
-  private final Map<String, Integer> perUserQuota;
   private final long failedDagRetentionTime;
 
   private volatile boolean isActive = false;
@@ -213,18 +203,9 @@ public class DagManager extends AbstractIdleService {
     } else {
       this.eventSubmitter = Optional.absent();
     }
-
-    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
-    ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
-    for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
-      mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
-    }
-    this.perUserQuota = mapBuilder.build();
-
     this.jobStatusRetriever = jobStatusRetriever;
-
     TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
     this.failedDagRetentionTime = timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, DEFAULT_FAILED_DAG_RETENTION_TIME));
   }
@@ -374,6 +355,7 @@ public class DagManager extends AbstractIdleService {
 
         ContextAwareMeter allSuccessfulMeter = null;
         ContextAwareMeter allFailedMeter = null;
+
         if (instrumentationEnabled) {
           MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
           allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -382,12 +364,22 @@ public class DagManager extends AbstractIdleService {
               ServiceMetricNames.FAILED_FLOW_METER));
         }
 
+        UserQuotaManager quotaManager = new UserQuotaManager(config);
+        // Before initializing the DagManagerThreads check which dags are currently running before shutdown
+        for (Dag<JobExecutionPlan> dag: dagStateStore.getDags()) {
+          for (DagNode<JobExecutionPlan> dagNode: dag.getNodes()) {
+            if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+              // Add all the currently running Dags to the quota limit per user
+              quotaManager.checkQuota(dagNode, true);
+            }
+          }
+        }
         //On startup, the service creates DagManagerThreads that are scheduled at a fixed rate.
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
-              runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
-              allSuccessfulMeter, allFailedMeter, this.defaultJobStartSlaTimeMillis);
+              runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, allSuccessfulMeter,
+              allFailedMeter, this.defaultJobStartSlaTimeMillis, quotaManager);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
         }
@@ -431,8 +423,6 @@ public class DagManager extends AbstractIdleService {
    */
   public static class DagManagerThread implements Runnable {
     private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>();
-    private static final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
-    private static final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
     private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
     private final Set<String> failedDagIds;
     private final Map<String, Dag<JobExecutionPlan>> resumingDags = new HashMap<>();
@@ -444,15 +434,13 @@ public class DagManager extends AbstractIdleService {
     private final MetricContext metricContext;
     private final Optional<EventSubmitter> eventSubmitter;
     private final Optional<Timer> jobStatusPolledTimer;
-    private final int defaultQuota;
-    private final Map<String, Integer> perUserQuota;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
     private static final Map<String, FlowState> flowGauges = Maps.newConcurrentMap();
     private final ContextAwareMeter allSuccessfulMeter;
     private final ContextAwareMeter allFailedMeter;
     private static final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
     private static final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
-
+    private final UserQuotaManager quotaManager;
     private final JobStatusRetriever jobStatusRetriever;
     private final DagStateStore dagStateStore;
     private final DagStateStore failedDagStateStore;
@@ -465,8 +453,8 @@ public class DagManager extends AbstractIdleService {
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
-        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> perUserQuota, Set<String> failedDagIds,
-        ContextAwareMeter allSuccessfulMeter, ContextAwareMeter allFailedMeter, Long defaultJobStartSla) {
+        boolean instrumentationEnabled, Set<String> failedDagIds, ContextAwareMeter allSuccessfulMeter,
+        ContextAwareMeter allFailedMeter, Long defaultJobStartSla, UserQuotaManager quotaManager) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.failedDagStateStore = failedDagStateStore;
@@ -474,11 +462,11 @@ public class DagManager extends AbstractIdleService {
       this.queue = queue;
       this.cancelQueue = cancelQueue;
       this.resumeQueue = resumeQueue;
-      this.defaultQuota = defaultQuota;
-      this.perUserQuota = perUserQuota;
       this.allSuccessfulMeter = allSuccessfulMeter;
       this.allFailedMeter = allFailedMeter;
       this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
+      this.quotaManager = quotaManager;
+
       if (instrumentationEnabled) {
         this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
         this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
@@ -675,6 +663,7 @@ public class DagManager extends AbstractIdleService {
           addJobState(dagId, dagNode);
           //Update the running jobs counter.
           getRunningJobsCounter(dagNode).inc();
+          getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
           isDagRunning = true;
         }
       }
@@ -951,7 +940,7 @@ public class DagManager extends AbstractIdleService {
       // Run this spec on selected executor
       SpecProducer<Spec> producer;
       try {
-        checkQuota(dagNode);
+        quotaManager.checkQuota(dagNode, false);
         producer = DagManagerUtils.getSpecProducer(dagNode);
         TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
             getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
@@ -990,87 +979,6 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    private void checkQuota(DagNode<JobExecutionPlan> dagNode) throws IOException {
-      String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
-      String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
-      boolean proxyUserCheck = true;
-      int proxyQuotaIncrement;
-      Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased
-      StringBuilder requesterMessage = new StringBuilder();
-
-      if (proxyUser != null) {
-        proxyQuotaIncrement = incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
-        proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check succeeds
-        if (!proxyUserCheck) {
-          requesterMessage.append(String.format(
-              "Quota exceeded for proxy user %s on executor %s : quota=%s, runningJobs=%d%n",
-              proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)));
-        }
-      }
-
-      String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
-      boolean requesterCheck = true;
-
-      if (serializedRequesters != null) {
-        List<String> uniqueRequesters = RequesterService.deserialize(serializedRequesters).stream()
-            .map(ServiceRequester::getName).distinct().collect(Collectors.toList());
-        for (String requester : uniqueRequesters) {
-          int userQuotaIncrement = incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
-          boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota check succeeds
-          usersQuotaIncrement.add(requester);
-          requesterCheck = requesterCheck && thisRequesterCheck;
-          if (!thisRequesterCheck) {
-            requesterMessage.append(String.format(
-                "Quota exceeded for requester %s on executor %s : quota=%s, runningJobs=%d%n",
-                requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement)));
-          }
-        }
-      }
-
-      // Throw errors for reach quota at the end to avoid inconsistent job counts
-      if (!proxyUserCheck || !requesterCheck) {
-        // roll back the increased counts in this block
-        decrementQuotaUsage(proxyUserToJobCount, proxyUser);
-        decrementQuotaUsageForUsers(usersQuotaIncrement);
-        throw new IOException(requesterMessage.toString());
-      }
-    }
-
-    /**
-     * Increment quota by one for the given map and key.
-     * @return a negative number if quota is already reached for this user
-     *         a positive number if the quota is not reached for this user
-     *         the absolute value of the number is the used quota before this increment request
-     *         0 if quota usage is not changed
-     */
-    private int incrementJobCountAndCheckUserQuota(Map<String, Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
-      String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
-
-      // Only increment job count for first attempt, since job is considered running between retries
-      if (dagNode.getValue().getCurrentAttempts() != 1) {
-        return 0;
-      }
-
-      Integer currentCount;
-      do {
-        currentCount = quotaMap.get(key);
-      } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : !quotaMap.replace(key, currentCount, currentCount + 1));
-
-      if (currentCount == null) {
-        currentCount = 0;
-      }
-
-      if (currentCount >= getQuotaForUser(user)) {
-        return -currentCount; // increment must have crossed the quota
-      } else {
-        return currentCount;
-      }
-    }
-
-    private int getQuotaForUser(String user) {
-      return perUserQuota.getOrDefault(user, defaultQuota);
-    }
-
     /**
      * Method that defines the actions to be performed when a job finishes either successfully or with failure.
      * This method updates the state of the dag and performs clean up actions as necessary.
@@ -1085,16 +993,13 @@ public class DagManager extends AbstractIdleService {
       String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
       ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
       log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
-
-      releaseQuota(dagNode);
-
-      if (this.metricContext != null) {
+      // Only decrement counters and quota for jobs that actually ran on the executor, not from a GaaS side failure/skip event
+      if (quotaManager.releaseQuota(dagNode) && this.metricContext != null) {
         getRunningJobsCounter(dagNode).dec();
         getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
       }
 
       switch (jobStatus) {
-        // TODO : For now treat canceled as failed, till we introduce failure option - CANCEL
         case FAILED:
           dag.setMessage("Flow failed because job " + jobName + " failed");
           if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
@@ -1118,45 +1023,6 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    /**
-     * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link DagNode}.
-     */
-    private void releaseQuota(DagNode<JobExecutionPlan> dagNode) {
-      String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
-      if (proxyUser != null) {
-        String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
-        decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
-      }
-
-      String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
-      if (serializedRequesters != null) {
-        try {
-          for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
-            String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
-            decrementQuotaUsage(requesterToJobCount, requesterKey);
-          }
-        } catch (IOException e) {
-          log.error("Failed to release quota for requester list " + serializedRequesters, e);
-        }
-      }
-    }
-
-    private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user) {
-      Integer currentCount;
-      if (user == null) {
-        return;
-      }
-      do {
-        currentCount = quotaMap.get(user);
-      } while (currentCount != null && currentCount > 0 && !quotaMap.replace(user, currentCount, currentCount - 1));
-    }
-
-    private void decrementQuotaUsageForUsers(Set<String> requestersToDecreaseCount) {
-      for (String requester : requestersToDecreaseCount) {
-       decrementQuotaUsage(DagManagerThread.requesterToJobCount, requester);
-      }
-    }
-
     private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
       this.jobToDag.remove(dagNode);
       this.dagToJobs.get(dagId).remove(dagNode);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
new file mode 100644
index 000000000..58b212db2
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Manages the statically configured user quotas for both the proxy user in user.to.proxy configuration and the API requester(s)
+ * Is used by the dag manager to ensure that the number of currently running jobs do not exceed the quota, if the quota
+ * is exceeded, then the execution will fail without running on the underlying executor
+ */
+@Slf4j
+public class UserQuotaManager {
+  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota";
+  public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String QUOTA_SEPERATOR = ":";
+  public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+  private final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
+  private final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
+  private final Map<String, Integer> perUserQuota;
+  Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
+  private final int defaultQuota;
+
+  UserQuotaManager(Config config) {
+    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
+    ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
+
+    for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
+      mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    this.perUserQuota = mapBuilder.build();
+  }
+
+  /**
+   * Checks if the dagNode exceeds the statically configured user quota for both the proxy user and requester user
+   * @throws IOException if the quota is exceeded, and logs a statement
+   */
+  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws IOException {
+    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+    boolean proxyUserCheck = true;
+    int proxyQuotaIncrement;
+    Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased
+    StringBuilder requesterMessage = new StringBuilder();
+    runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
+
+    if (proxyUser != null) {
+      proxyQuotaIncrement = incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check succeeds
+      if (!proxyUserCheck) {
+        // add 1 to proxyUserIncrement since count starts at 0, and is negative if quota is exceeded
+        requesterMessage.append(String.format(
+            "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
+            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+      }
+    }
+
+    String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+    boolean requesterCheck = true;
+
+    if (serializedRequesters != null) {
+      List<String> uniqueRequesters = RequesterService.deserialize(serializedRequesters).stream()
+          .map(ServiceRequester::getName).distinct().collect(Collectors.toList());
+      for (String requester : uniqueRequesters) {
+        int userQuotaIncrement = incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
+        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota check succeeds
+        usersQuotaIncrement.add(requester);
+        requesterCheck = requesterCheck && thisRequesterCheck;
+        if (!thisRequesterCheck) {
+          requesterMessage.append(String.format(
+              "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n",
+              requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
+        }
+      }
+    }
+
+    // Throw errors for reach quota at the end to avoid inconsistent job counts
+    if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+      // roll back the increased counts in this block
+      String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+      decrementQuotaUsage(proxyUserToJobCount, userKey);
+      decrementQuotaUsageForUsers(usersQuotaIncrement);
+      runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+      throw new IOException(requesterMessage.toString());
+    }
+  }
+
+  /**
+   * Increment quota by one for the given map and key.
+   * @return a negative number if quota is already reached for this user
+   *         a positive number if the quota is not reached for this user
+   *         the absolute value of the number is the used quota before this increment request
+   *         0 if quota usage is not changed
+   */
+  private int incrementJobCountAndCheckUserQuota(Map<String, Integer> quotaMap, String user, Dag.DagNode<JobExecutionPlan> dagNode) {
+    String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
+
+    // Only increment job count for first attempt, since job is considered running between retries
+    if (dagNode.getValue().getCurrentAttempts() != 1) {
+      return 0;
+    }
+
+    Integer currentCount;
+    // Modifications must be thread safe since DAGs on DagManagerThreads may update the quota for the same user
+    do {
+      currentCount = quotaMap.get(key);
+    } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : !quotaMap.replace(key, currentCount, currentCount + 1));
+
+    if (currentCount == null) {
+      currentCount = 0;
+    }
+
+    if (currentCount >= getQuotaForUser(user)) {
+      return -currentCount; // increment must have crossed the quota
+    } else {
+      return currentCount;
+    }
+  }
+
+  /**
+   * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
+   * Returns true if the dag existed in the set of running dags and was removed successfully
+   */
+  public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) {
+    Boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+    if (val == null) {
+      return false;
+    }
+    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+    if (proxyUser != null) {
+      String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+      decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
+    }
+    String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+    if (serializedRequesters != null) {
+      try {
+        for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
+          String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
+          decrementQuotaUsage(requesterToJobCount, requesterKey);
+        }
+      } catch (IOException e) {
+        log.error("Failed to release quota for requester list " + serializedRequesters, e);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user) {
+    Integer currentCount;
+    if (user == null) {
+      return;
+    }
+    do {
+      currentCount = quotaMap.get(user);
+    } while (currentCount != null && currentCount > 0 && !quotaMap.replace(user, currentCount, currentCount - 1));
+  }
+
+  private void decrementQuotaUsageForUsers(Set<String> requestersToDecreaseCount) {
+    for (String requester : requestersToDecreaseCount) {
+      decrementQuotaUsage(requesterToJobCount, requester);
+    }
+  }
+
+  private int getQuotaForUser(String user) {
+    return this.perUserQuota.getOrDefault(user, defaultQuota);
+  }
+
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index b072185c8..9daad5fbc 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -29,10 +31,12 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -65,6 +69,7 @@ import org.apache.gobblin.util.ConfigUtils;
 public class DagManagerTest {
   private final String dagStateStoreDir = "/tmp/dagManagerTest/dagStateStore";
   private DagStateStore _dagStateStore;
+  private DagStateStore _failedDagStateStore;
   private JobStatusRetriever _jobStatusRetriever;
   private DagManager.DagManagerThread _dagManagerThread;
   private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
@@ -73,8 +78,10 @@ public class DagManagerTest {
   private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
   private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
   private Map<String, Dag<JobExecutionPlan>> dags;
+  private UserQuotaManager _gobblinServiceQuotaManager;
   private Set<String> failedDagIds;
   private static long START_SLA_DEFAULT = 15 * 60 * 1000;
+  private MetricContext metricContext;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -83,15 +90,18 @@ public class DagManagerTest {
         .withValue(FSDagStateStore.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
 
     this._dagStateStore = new FSDagStateStore(config, new HashMap<>());
-    DagStateStore failedDagStateStore = new InMemoryDagStateStore();
+    this._failedDagStateStore = new InMemoryDagStateStore();
     this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
     this.queue = new LinkedBlockingQueue<>();
     this.cancelQueue = new LinkedBlockingQueue<>();
     this.resumeQueue = new LinkedBlockingQueue<>();
-    MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
-    this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, failedDagStateStore, queue, cancelQueue,
-        resumeQueue, true, 5, new HashMap<>(), new HashSet<>(), metricContext.contextAwareMeter("successMeter"),
-        metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT);
+    this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
+    Config quotaConfig = ConfigFactory.empty()
+        .withValue(UserQuotaManager.PER_USER_QUOTA, ConfigValueFactory.fromAnyRef("user:1"));
+    this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
+    this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore, queue, cancelQueue,
+        resumeQueue, true, new HashSet<>(), metricContext.contextAwareMeter("successMeter"),
+        metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT, _gobblinServiceQuotaManager);
 
     Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);
@@ -110,6 +120,19 @@ public class DagManagerTest {
     this.failedDagIds = (Set<String>) failedDagIdsField.get(this._dagManagerThread);
   }
 
+  /**
+   * Create a list of dags with only one node each
+   * @return a Dag.
+   */
+  static List<Dag<JobExecutionPlan>> buildDagList(int numDags, String proxyUser) throws URISyntaxException{
+    List<Dag<JobExecutionPlan>> dagList = new ArrayList<>();
+    for (int i = 0; i < numDags; i++) {
+      dagList.add(buildDag(Integer.toString(i),  System.currentTimeMillis(), DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 1,
+          proxyUser));
+    }
+    return dagList;
+  }
+
   /**
    * Create a {@link Dag <JobExecutionPlan>}.
    * @return a Dag.
@@ -122,6 +145,11 @@ public class DagManagerTest {
 
   static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, String flowFailureOption, int numNodes)
       throws URISyntaxException {
+    return buildDag(id, flowExecutionId, flowFailureOption, numNodes, "testUser");
+  }
+
+  static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, String flowFailureOption, int numNodes, String proxyUser)
+      throws URISyntaxException {
     List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
 
     for (int i = 0; i < numNodes; i++) {
@@ -132,7 +160,8 @@ public class DagManagerTest {
           addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId).
           addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
           addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
-          addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, flowFailureOption).build();
+          addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, flowFailureOption).
+          addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
       if ((i == 1) || (i == 2)) {
         jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0"));
       } else if (i == 3) {
@@ -757,6 +786,118 @@ public class DagManagerTest {
     Assert.assertEquals(this.dagToJobs.size(), 0);
   }
 
+  @Test (dependsOnMethods = "testDagManagerWithBadFlowSLAConfig")
+  public void testDagManagerQuotaExceeded() throws URISyntaxException, IOException {
+    List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "user");
+    //Add a dag to the queue of dags
+    this.queue.offer(dagList.get(0));
+    Config jobConfig0 = dagList.get(0).getNodes().get(0).getValue().getJobSpec().getConfig();
+    Config jobConfig1 = dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
+    Iterator<JobStatus> jobStatusIterator0 =
+        getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator1 =
+        getMockJobStatus("flow1", "group1", Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group1", String.valueOf(ExecutionStatus.FAILED));
+    // Cleanup the running job that is scheduled normally
+    Iterator<JobStatus> jobStatusIterator2 =
+        getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator3 =
+        getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow0"), Mockito.eq("group0"), Mockito.anyLong(),
+            Mockito.anyString(), Mockito.anyString()))
+        .thenReturn(jobStatusIterator0)
+        .thenReturn(jobStatusIterator2)
+        .thenReturn(jobStatusIterator3);
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow1"), Mockito.eq("group1"), Mockito.anyLong(),
+            Mockito.anyString(), Mockito.anyString()))
+        .thenReturn(jobStatusIterator1);
+
+    this._dagManagerThread.run();
+    // dag will not be processed due to exceeding the quota, will log a message and exit out without adding it to dags
+    this.queue.offer(dagList.get(1));
+    this._dagManagerThread.run();
+    SortedMap<String, Counter> allCounters = metricContext.getParent().get().getCounters();
+    Assert.assertEquals(allCounters.get(MetricRegistry.name(
+        ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ServiceMetricNames.SERVICE_USERS,
+        "user")).getCount(), 1);
+
+    this._dagManagerThread.run(); // cleanup
+  }
+
+  @Test (dependsOnMethods = "testDagManagerQuotaExceeded")
+  public void testQuotaDecrement() throws URISyntaxException, IOException {
+
+    List<Dag<JobExecutionPlan>> dagList = buildDagList(3, "user");
+    //Add a dag to the queue of dags
+    this.queue.offer(dagList.get(0));
+    this.queue.offer(dagList.get(1));
+    Config jobConfig0 = dagList.get(0).getNodes().get(0).getValue().getJobSpec().getConfig();
+    Config jobConfig1 = dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
+    Config jobConfig2 = dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
+
+    Iterator<JobStatus> jobStatusIterator0 =
+        getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator1 =
+        getMockJobStatus("flow1", "group1", Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group1", String.valueOf(ExecutionStatus.FAILED));
+    Iterator<JobStatus> jobStatusIterator2 =
+        getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator3 =
+        getMockJobStatus("flow2", "group2", Long.valueOf(jobConfig2.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group2", String.valueOf(ExecutionStatus.FAILED));
+    Iterator<JobStatus> jobStatusIterator4 =
+        getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow0"), Mockito.eq("group0"), Mockito.anyLong(),
+            Mockito.anyString(), Mockito.anyString()))
+        .thenReturn(jobStatusIterator0)
+        .thenReturn(jobStatusIterator2)
+        .thenReturn(jobStatusIterator4);
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow1"), Mockito.eq("group1"), Mockito.anyLong(),
+            Mockito.anyString(), Mockito.anyString()))
+        .thenReturn(jobStatusIterator1);
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow2"), Mockito.eq("group2"), Mockito.anyLong(),
+            Mockito.anyString(), Mockito.anyString()))
+        .thenReturn(jobStatusIterator3);
+
+    this._dagManagerThread.run();
+
+    SortedMap<String, Counter> allCounters = metricContext.getParent().get().getCounters();
+    Assert.assertEquals(allCounters.get(MetricRegistry.name(
+        ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ServiceMetricNames.SERVICE_USERS,
+        "user")).getCount(), 1);
+    // Test case where a job that exceeded a quota would cause a double decrement after fixing the proxy user name, allowing for more jobs to run
+    this.queue.offer(dagList.get(2));
+    this._dagManagerThread.run();
+    // Assert that running dag metrics are only counted once
+    Assert.assertEquals(allCounters.get(MetricRegistry.name(
+        ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ServiceMetricNames.SERVICE_USERS,
+        "user")).getCount(), 1);
+
+    this._dagManagerThread.run(); // cleanup
+    Assert.assertEquals(allCounters.get(MetricRegistry.name(
+        ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ServiceMetricNames.SERVICE_USERS,
+        "user")).getCount(), 0);
+
+  }
+
 
   @AfterClass
   public void cleanUp() throws Exception {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
new file mode 100644
index 000000000..2ac0acc79
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.io.IOException;
+import java.util.List;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class UserQuotaManagerTest {
+
+  UserQuotaManager _quotaManager;
+
+  @BeforeClass
+  public void setUp() {
+    Config quotaConfig = ConfigFactory.empty()
+        .withValue(UserQuotaManager.PER_USER_QUOTA, ConfigValueFactory.fromAnyRef("user:1"))
+        .withValue(UserQuotaManager.PER_USER_QUOTA, ConfigValueFactory.fromAnyRef("user2:1"));
+    this._quotaManager = new UserQuotaManager(quotaConfig);
+  }
+
+  // Tests that if exceeding the quota on startup, do not throw an exception and do not decrement the counter
+  @Test
+  public void testExceedsQuotaOnStartup() throws Exception {
+    List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user");
+    // Ensure that the current attempt is 1, normally done by DagManager
+    dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
+    dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
+
+    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), true);
+    // Should not be throwing the exception
+    this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), true);
+
+    // TODO: add verification when adding a public method for getting the current count and quota per user
+  }
+
+  @Test
+  public void testExceedsQuotaThrowsException() throws Exception {
+    List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user2");
+
+    // Ensure that the current attempt is 1, normally done by DagManager
+    dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
+    dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
+
+    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+    Assert.assertThrows(IOException.class, () -> {
+      this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), false);
+    });
+  }
+}