You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/10/03 20:01:05 UTC

[gobblin] branch master updated: [GOBBLIN-1703] avoid double quota increase for adhoc flows (#3550)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c3f6d1fc [GOBBLIN-1703] avoid double quota increase for adhoc flows (#3550)
0c3f6d1fc is described below

commit 0c3f6d1fc5dbe18c9ffecad9db459cf48465812f
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Mon Oct 3 15:00:56 2022 -0500

    [GOBBLIN-1703] avoid double quota increase for adhoc flows (#3550)
    
    * avoid double quota increase for adhoc flows
    
    * corrected some config names
    
    * address review comments, made runningDagIds map implementation specific
    
    * address review comments
    
    * address review comments
    
    * fix checkstyle
    
    * make checking quota for multiple dag nodes atomic
    
    * fix unit test
    
    * remove unused code
    
    * remove unused imports
    
    * address review comments
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |  11 +-
 .../runtime/spec_store/MysqlBaseSpecStore.java     |   3 +-
 .../modules/flow/BaseFlowToJobSpecCompiler.java    |  20 ++
 .../service/modules/flow/MultiHopFlowCompiler.java |   6 +
 .../orchestration/AbstractUserQuotaManager.java    | 158 +-----------
 .../service/modules/orchestration/DagManager.java  |   7 +-
 .../orchestration/InMemoryUserQuotaManager.java    | 162 +++++++++++-
 .../orchestration/MysqlUserQuotaManager.java       | 283 ++++++++++++++++++---
 .../modules/orchestration/UserQuotaManager.java    |   2 +-
 .../scheduler/GobblinServiceJobScheduler.java      |   7 +-
 .../InMemoryUserQuotaManagerTest.java              |  23 +-
 .../orchestration/MysqlUserQuotaManagerTest.java   |  78 ++++--
 12 files changed, 529 insertions(+), 231 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 55b8282fe..d9c251e69 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -32,7 +32,9 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
   public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
   public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
-  public static final String GOBBLIN_SERVICE_ORCHESTRATOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "orchestrator.enabled";
+
+  public static final String GOBBLIN_SERVICE_ADHOC_FLOW = GOBBLIN_SERVICE_PREFIX + "adhoc.flow";
+
   public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
   public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
   public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
@@ -146,6 +148,13 @@ public class ServiceConfigKeys {
   public static final String QUOTA_MANAGER_CLASS = GOBBLIN_SERVICE_PREFIX + "quotaManager.class";
   public static final String DEFAULT_QUOTA_MANAGER = "org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager";
 
+  public static final String QUOTA_STORE_DB_TABLE_KEY = "quota.store.db.table";
+  public static final String DEFAULT_QUOTA_STORE_DB_TABLE = "quota_table";
+
+  public static final String RUNNING_DAG_IDS_DB_TABLE_KEY = "running.dag.ids.store.db.table";
+  public static final String DEFAULT_RUNNING_DAG_IDS_DB_TABLE = "running_dag_ids";
+
+
   // Group Membership authentication service
   public static final String GROUP_OWNERSHIP_SERVICE_CLASS = GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
   public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE = "org.apache.gobblin.service.NoopGroupOwnershipService";
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
index b2369588f..3e3a78201 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -36,7 +36,6 @@ import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
 import com.typesafe.config.Config;
 
-import javax.sql.DataSource;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -133,7 +132,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
   }
 
 
-  protected final DataSource dataSource;
+  protected final BasicDataSource dataSource;
   protected final String tableName;
   private final URI specStoreURI;
   protected final SpecSerDe specSerDe;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index f910e9e4e..976d12650 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -24,7 +24,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import javax.inject.Inject;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,6 +92,11 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
   @Setter
   protected boolean active;
 
+  private boolean warmStandbyEnabled;
+
+  @Inject
+  UserQuotaManager userQuotaManager;
+
   public BaseFlowToJobSpecCompiler(Config config){
     this(config,true);
   }
@@ -119,6 +126,8 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
       this.dataAuthorizationTimer = Optional.absent();
     }
 
+    this.warmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
+
     this.topologySpecMap = Maps.newConcurrentMap();
     this.config = config;
 
@@ -181,6 +190,17 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
 
     // always try to compile the flow to verify if it is compilable
     Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+
+    if (this.warmStandbyEnabled &&
+        (!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+      try {
+        userQuotaManager.checkQuota(dag.getStartNodes());
+        flowSpec.getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true");
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     // If dag is null then a compilation error has occurred
     if (dag != null && !dag.isEmpty()) {
       response = dag.toString();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 3e6eb13dc..dc3d0bc47 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -281,6 +281,12 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     Instrumented.markMeter(flowCompilationSuccessFulMeter);
     Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
 
+    if (Boolean.parseBoolean(flowSpec.getConfigAsProperties().getProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW))) {
+      for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getStartNodes()) {
+        dagNode.getValue().getJobSpec().getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true");
+      }
+    }
+
     return jobExecutionPlanDag;
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
index fad079bf0..b92d6d610 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
@@ -18,10 +18,7 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
@@ -30,10 +27,6 @@ import lombok.AllArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.exception.QuotaExceededException;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -49,8 +42,7 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager {
   public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
   private final Map<String, Integer> perUserQuota;
   private final Map<String, Integer> perFlowGroupQuota;
-  // TODO : we might want to make this field implementation specific to be able to decide if the dag is already running or have been accepted
-  Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+
   private final int defaultQuota;
 
   public AbstractUserQuotaManager(Config config) {
@@ -69,155 +61,13 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager {
     this.perFlowGroupQuota = flowGroupMapBuilder.build();
   }
 
-  // Implementations should return the current count and increase them by one
-  abstract int incrementJobCount(String key, CountType countType) throws IOException;
-
-  abstract void decrementJobCount(String key, CountType countType) throws IOException;
-
-  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
-    QuotaCheck quotaCheck = increaseAndCheckQuota(dagNode);
-
-    // Throw errors for reach quota at the end to avoid inconsistent job counts
-    if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) {
-      // roll back the increased counts in this block
-      rollbackIncrements(dagNode);
-      throw new QuotaExceededException(quotaCheck.requesterMessage);
-    }
-  }
-
-  private void rollbackIncrements(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
-    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
-    String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
-    List<String> usersQuotaIncrement = DagManagerUtils.getDistinctUniqueRequesters(DagManagerUtils.getSerializedRequesterList(dagNode));
-
-    decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), CountType.USER_COUNT);
-    decrementQuotaUsageForUsers(usersQuotaIncrement);
-    decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
-    runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString());
-  }
-
-  protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
-    QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
-    // Dag is already being tracked, no need to double increment for retries and multihop flows
-    if (this.runningDagIds.contains(DagManagerUtils.generateDagId(dagNode).toString())) {
-      return quotaCheck;
-    } else {
-      runningDagIds.add(DagManagerUtils.generateDagId(dagNode).toString());
-    }
-
-    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
-    String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
-        ConfigurationKeys.FLOW_GROUP_KEY, "");
-    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
-    StringBuilder requesterMessage = new StringBuilder();
-
-    boolean proxyUserCheck;
-
-    if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
-      int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
-          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT);
-      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check succeeds
-      quotaCheck.setProxyUserCheck(proxyUserCheck);
-      if (!proxyUserCheck) {
-        // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count before the increment
-        requesterMessage.append(String.format(
-            "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
-            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
-      }
-    }
-
-    String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
-    boolean requesterCheck = true;
-
-    if (dagNode.getValue().getCurrentAttempts() <= 1) {
-      List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
-      for (String requester : uniqueRequesters) {
-        int userQuotaIncrement = incrementJobCountAndCheckQuota(
-            DagManagerUtils.getUserQuotaKey(requester, dagNode), getQuotaForUser(requester), CountType.REQUESTER_COUNT);
-        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota check succeeds
-        requesterCheck = requesterCheck && thisRequesterCheck;
-        quotaCheck.setRequesterCheck(requesterCheck);
-        if (!thisRequesterCheck) {
-          requesterMessage.append(String.format(
-              "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
-              requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
-        }
-      }
-    }
-
-    boolean flowGroupCheck;
-
-    if (dagNode.getValue().getCurrentAttempts() <= 1) {
-      int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
-          DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
-      flowGroupCheck = flowGroupQuotaIncrement >= 0;
-      quotaCheck.setFlowGroupCheck(flowGroupCheck);
-      if (!flowGroupCheck) {
-        requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n",
-            flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
-            Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup)));
-      }
-    }
-
-    quotaCheck.setRequesterMessage(requesterMessage.toString());
-
-    return quotaCheck;
-  }
-
-  /**
-   * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
-   * Returns true if the dag existed in the set of running dags and was removed successfully
-   */
-  public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
-    boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString());
-    if (!val) {
-      return false;
-    }
-
-    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
-    if (proxyUser != null) {
-      String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
-      decrementJobCount(proxyUserKey, CountType.USER_COUNT);
-    }
-
-    String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
-        ConfigurationKeys.FLOW_GROUP_KEY, "");
-    decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
-
-    String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
-    try {
-      for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
-        String requesterKey = DagManagerUtils.getUserQuotaKey(requester, dagNode);
-        decrementJobCount(requesterKey, CountType.REQUESTER_COUNT);
-      }
-    } catch (IOException e) {
-      log.error("Failed to release quota for requester list " + serializedRequesters, e);
-      return false;
-    }
-
-    return true;
-  }
-
-  private int incrementJobCountAndCheckQuota(String key, int keyQuota, CountType countType) throws IOException {
-    int currentCount = incrementJobCount(key, countType);
-    if (currentCount >= keyQuota) {
-      return -currentCount;
-    } else {
-      return currentCount;
-    }
-  }
-
-  private void decrementQuotaUsageForUsers(List<String> requestersToDecreaseCount) throws IOException {
-    for (String requester : requestersToDecreaseCount) {
-      decrementJobCount(requester, CountType.REQUESTER_COUNT);
-    }
-  }
+  abstract boolean containsDagId(String dagId) throws IOException;
 
-  private int getQuotaForUser(String user) {
+  int getQuotaForUser(String user) {
     return this.perUserQuota.getOrDefault(user, defaultQuota);
   }
 
-  private int getQuotaForFlowGroup(String flowGroup) {
+  int getQuotaForFlowGroup(String flowGroup) {
     return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index fcb2ff4c1..321f93b9a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -73,6 +73,7 @@ import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -963,7 +964,11 @@ public class DagManager extends AbstractIdleService {
       // Run this spec on selected executor
       SpecProducer<Spec> producer;
       try {
-        quotaManager.checkQuota(dagNode);
+        if (!Boolean.parseBoolean(dagNode.getValue().getJobSpec().getConfigAsProperties().getProperty(
+            ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "false"))) {
+          quotaManager.checkQuota(Collections.singleton(dagNode));
+        }
+
         producer = DagManagerUtils.getSpecProducer(dagNode);
         TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
             getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
index 438e86d20..f4c9ab023 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
@@ -21,13 +21,18 @@ import com.typesafe.config.Config;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.inject.Singleton;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
 
 import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
 
@@ -42,9 +47,153 @@ public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
   private final Map<String, Integer> flowGroupToJobCount = new ConcurrentHashMap<>();
   private final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
 
+  private final Set<String> runningDagIds;
+
   @Inject
   public InMemoryUserQuotaManager(Config config) {
     super(config);
+    this.runningDagIds = ConcurrentHashMap.newKeySet();;
+  }
+
+  protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+    QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+    // Dag is already being tracked, no need to double increment for retries and multihop flows
+    if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+      return quotaCheck;
+    } else {
+      addDagId(DagManagerUtils.generateDagId(dagNode).toString());
+    }
+
+    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+    String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+        ConfigurationKeys.FLOW_GROUP_KEY, "");
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+    StringBuilder requesterMessage = new StringBuilder();
+
+    boolean proxyUserCheck;
+
+    if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+      int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
+          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT);
+      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check succeeds
+      quotaCheck.setProxyUserCheck(proxyUserCheck);
+      if (!proxyUserCheck) {
+        // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count before the increment
+        requesterMessage.append(String.format(
+            "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
+            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+      }
+    }
+
+    String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+    boolean requesterCheck = true;
+
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+      for (String requester : uniqueRequesters) {
+        int userQuotaIncrement = incrementJobCountAndCheckQuota(
+            DagManagerUtils.getUserQuotaKey(requester, dagNode), getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota check succeeds
+        requesterCheck = requesterCheck && thisRequesterCheck;
+        quotaCheck.setRequesterCheck(requesterCheck);
+        if (!thisRequesterCheck) {
+          requesterMessage.append(String.format(
+              "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
+              requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+        }
+      }
+    }
+
+    boolean flowGroupCheck;
+
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+          DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+      flowGroupCheck = flowGroupQuotaIncrement >= 0;
+      quotaCheck.setFlowGroupCheck(flowGroupCheck);
+      if (!flowGroupCheck) {
+        requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n",
+            flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+            Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup)));
+      }
+    }
+
+    quotaCheck.setRequesterMessage(requesterMessage.toString());
+
+    return quotaCheck;
+  }
+
+  protected void rollbackIncrements(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+    String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
+    List<String> usersQuotaIncrement = DagManagerUtils.getDistinctUniqueRequesters(DagManagerUtils.getSerializedRequesterList(dagNode));
+
+    decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), CountType.USER_COUNT);
+    decrementQuotaUsageForUsers(usersQuotaIncrement);
+    decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
+    removeDagId(DagManagerUtils.generateDagId(dagNode).toString());
+  }
+
+  private int incrementJobCountAndCheckQuota(String key, int keyQuota, CountType countType) throws IOException {
+    int currentCount = incrementJobCount(key, countType);
+    if (currentCount >= keyQuota) {
+      return -currentCount;
+    } else {
+      return currentCount;
+    }
+  }
+
+  private void decrementQuotaUsageForUsers(List<String> requestersToDecreaseCount) throws IOException {
+    for (String requester : requestersToDecreaseCount) {
+      decrementJobCount(requester, CountType.REQUESTER_COUNT);
+    }
+  }
+
+  /**
+   * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
+   * Returns true if the dag existed in the set of running dags and was removed successfully
+   */
+  public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+    boolean val = removeDagId(DagManagerUtils.generateDagId(dagNode).toString());
+    if (!val) {
+      return false;
+    }
+
+    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+    if (proxyUser != null) {
+      String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+      decrementJobCount(proxyUserKey, CountType.USER_COUNT);
+    }
+
+    String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+        ConfigurationKeys.FLOW_GROUP_KEY, "");
+    decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
+
+    String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+    try {
+      for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
+        String requesterKey = DagManagerUtils.getUserQuotaKey(requester, dagNode);
+        decrementJobCount(requesterKey, CountType.REQUESTER_COUNT);
+      }
+    } catch (IOException e) {
+      log.error("Failed to release quota for requester list " + serializedRequesters, e);
+      return false;
+    }
+
+    return true;
+  }
+
+  void addDagId(String dagId) {
+    this.runningDagIds.add(dagId);
+  }
+
+  @Override
+  boolean containsDagId(String dagId) {
+    return this.runningDagIds.contains(dagId);
+  }
+
+  boolean removeDagId(String dagId) {
+    return this.runningDagIds.remove(dagId);
   }
 
   public void init(Collection<Dag<JobExecutionPlan>> dags) throws IOException {
@@ -58,6 +207,17 @@ public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
     }
   }
 
+  public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes) throws IOException {
+    for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
+      QuotaCheck quotaCheck = increaseAndCheckQuota(dagNode);
+      if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) {
+        // roll back the increased counts in this block
+        rollbackIncrements(dagNode);
+        throw new QuotaExceededException(quotaCheck.requesterMessage);
+      }
+    }
+  }
+
   private int incrementJobCount(String key, Map<String, Integer> quotaMap) {
     Integer currentCount;
     // Modifications must be thread safe since DAGs on DagManagerThreads may update the quota for the same user
@@ -86,7 +246,6 @@ public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
     }
   }
 
-  @Override
   int incrementJobCount(String user, CountType countType) throws IOException {
     switch (countType) {
       case USER_COUNT:
@@ -100,7 +259,6 @@ public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
     }
   }
 
-  @Override
   void decrementJobCount(String user, CountType countType) throws IOException {
     switch (countType) {
       case USER_COUNT:
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index 78f15cfba..1cf49c83f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
 
+import java.util.List;
 import org.apache.commons.dbcp.BasicDataSource;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -35,7 +36,9 @@ import javax.sql.DataSource;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
@@ -47,12 +50,28 @@ import org.apache.gobblin.util.ConfigUtils;
 @Slf4j
 @Singleton
 public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
-  private final MysqlQuotaStore mysqlStore;
+  public final MysqlQuotaStore quotaStore;
+  public final RunningDagIdsStore runningDagIds;
+
 
   @Inject
   public MysqlUserQuotaManager(Config config) throws IOException {
     super(config);
-    this.mysqlStore = createQuotaStore(config);
+    this.quotaStore = createQuotaStore(config);
+    this.runningDagIds = createRunningDagStore(config);
+  }
+
+  void addDagId(Connection connection, String dagId) throws IOException {
+    this.runningDagIds.add(connection, dagId);
+  }
+
+  @Override
+  boolean containsDagId(String dagId) throws IOException {
+    return this.runningDagIds.contains(dagId);
+  }
+
+  boolean removeDagId(Connection connection, String dagId) throws IOException {
+    return this.runningDagIds.remove(connection, dagId);
   }
 
   // This implementation does not need to update quota usage when the service restarts or it's leadership status changes
@@ -60,42 +79,191 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
   }
 
   @Override
-  int incrementJobCount(String user, CountType countType) throws IOException {
-    try {
-      return this.mysqlStore.increaseCount(user, countType);
+  public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes) throws IOException {
+    try (Connection connection = this.quotaStore.dataSource.getConnection()) {
+      connection.setAutoCommit(false);
+
+      for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
+        QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode);
+        if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) {
+          connection.rollback();
+          throw new QuotaExceededException(quotaCheck.requesterMessage);
+        }
+      }
+      connection.commit();
     } catch (SQLException e) {
       throw new IOException(e);
     }
   }
 
-  @Override
-  void decrementJobCount(String user, CountType countType) throws IOException {
+  int incrementJobCount(Connection connection, String user, CountType countType) throws IOException, SQLException {
+    return this.quotaStore.increaseCount(connection, user, countType);
+  }
+
+  void decrementJobCount(Connection connection,String user, CountType countType) throws IOException, SQLException {
+      this.quotaStore.decreaseCount(connection, user, countType);
+  }
+
+  protected QuotaCheck increaseAndCheckQuota(Connection connection, Dag.DagNode<JobExecutionPlan> dagNode)
+      throws SQLException, IOException {
+    QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+    StringBuilder requesterMessage = new StringBuilder();
+
+    // Dag is already being tracked, no need to double increment for retries and multihop flows
+    if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+      return quotaCheck;
+    } else {
+      addDagId(connection, DagManagerUtils.generateDagId(dagNode).toString());
+    }
+
+    String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+    String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+        ConfigurationKeys.FLOW_GROUP_KEY, "");
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+    boolean proxyUserCheck;
+
+    if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+      int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT);
+      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check succeeds
+      quotaCheck.setProxyUserCheck(proxyUserCheck);
+      if (!proxyUserCheck) {
+        // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count before the increment
+        requesterMessage.append(String.format(
+            "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
+            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+      }
+    }
+
+    String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+    boolean requesterCheck = true;
+
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+      for (String requester : uniqueRequesters) {
+        int userQuotaIncrement = incrementJobCountAndCheckQuota(connection, DagManagerUtils.getUserQuotaKey(requester, dagNode),
+            getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota check succeeds
+        requesterCheck = requesterCheck && thisRequesterCheck;
+        quotaCheck.setRequesterCheck(requesterCheck);
+        if (!thisRequesterCheck) {
+          requesterMessage.append(String.format("Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
+              requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+        }
+      }
+    }
+
+    boolean flowGroupCheck;
+
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+          DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+      flowGroupCheck = flowGroupQuotaIncrement >= 0;
+      quotaCheck.setFlowGroupCheck(flowGroupCheck);
+      if (!flowGroupCheck) {
+        requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n",
+            flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+            Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup)));
+      }
+    }
+
+    quotaCheck.setRequesterMessage(requesterMessage.toString());
+
+    return quotaCheck;
+  }
+
+  protected int incrementJobCountAndCheckQuota(Connection connection, String key, int keyQuota, CountType countType)
+      throws IOException, SQLException {
+    int currentCount = incrementJobCount(connection, key, countType);
+    if (currentCount >= keyQuota) {
+      return -currentCount;
+    } else {
+      return currentCount;
+    }
+  }
+
+  /**
+   * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
+   * Returns true if the dag existed in the set of running dags and was removed successfully
+   */
+  public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+    Connection connection;
     try {
-      this.mysqlStore.decreaseCount(user, countType);
+      connection = this.quotaStore.dataSource.getConnection();
+      connection.setAutoCommit(false);
     } catch (SQLException e) {
       throw new IOException(e);
     }
+
+    try {
+      boolean val = removeDagId(connection, DagManagerUtils.generateDagId(dagNode).toString());
+      if (!val) {
+        return false;
+      }
+
+      String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+      if (proxyUser != null) {
+        String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+        decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT);
+      }
+
+      String flowGroup =
+          ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
+      decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
+
+      String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+      try {
+        for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
+          String requesterKey = DagManagerUtils.getUserQuotaKey(requester, dagNode);
+          decrementJobCount(connection, requesterKey, CountType.REQUESTER_COUNT);
+        }
+      } catch (IOException e) {
+        log.error("Failed to release quota for requester list " + serializedRequesters, e);
+        return false;
+      }
+      connection.commit();
+    } catch (SQLException ex) {
+      throw new IOException(ex);
+    } finally {
+      try {
+        connection.close();
+      } catch (SQLException ex) {
+        throw new IOException(ex);
+      }
+    }
+
+    return true;
   }
 
   @VisibleForTesting
   int getCount(String name, CountType countType) throws IOException {
-    return this.mysqlStore.getCount(name, countType);
+    return this.quotaStore.getCount(name, countType);
   }
 
   /**
-   * Creating an instance of StateStore.
+   * Creating an instance of MysqlQuotaStore.
    */
   protected MysqlQuotaStore createQuotaStore(Config config) throws IOException {
-    String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
-        ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+    String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_STORE_DB_TABLE_KEY,
+        ServiceConfigKeys.DEFAULT_QUOTA_STORE_DB_TABLE);
+
+    BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
+
+    return new MysqlQuotaStore(basicDataSource, quotaStoreTableName);
+  }
+
+  protected RunningDagIdsStore createRunningDagStore(Config config) throws IOException {
+    String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.RUNNING_DAG_IDS_DB_TABLE_KEY,
+        ServiceConfigKeys.DEFAULT_RUNNING_DAG_IDS_DB_TABLE);
 
     BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
 
-    return new MysqlQuotaStore(basicDataSource, stateStoreTableName);
+    return new RunningDagIdsStore(basicDataSource, quotaStoreTableName);
   }
 
   static class MysqlQuotaStore {
-    protected final DataSource dataSource;
+    protected final BasicDataSource dataSource;
     final String tableName;
     private final String GET_USER_COUNT;
     private final String GET_REQUESTER_COUNT;
@@ -133,7 +301,9 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
       try (Connection connection = dataSource.getConnection(); PreparedStatement createStatement = connection.prepareStatement(createQuotaTable)) {
         createStatement.executeUpdate();
       } catch (SQLException e) {
-        throw new IOException("Failure creation table " + tableName, e);
+        log.warn("Failure in creating table {}. Validation query is set to {} Exception is {}",
+            tableName, this.dataSource.getValidationQuery(), e);
+        throw new IOException(e);
       }
     }
 
@@ -158,13 +328,11 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
       }
     }
 
-    public int increaseCount(String name, CountType countType) throws IOException, SQLException {
-      Connection connection = dataSource.getConnection();
-      connection.setAutoCommit(false);
-
+    public int increaseCount(Connection connection, String name, CountType countType) throws IOException, SQLException {
       String selectStatement;
       String increaseStatement;
 
+
       switch(countType) {
         case USER_COUNT:
           selectStatement = GET_USER_COUNT;
@@ -189,27 +357,19 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
         statement2.setString(1, name);
         rs = statement1.executeQuery();
         statement2.executeUpdate();
-        connection.commit();
         if (rs != null && rs.next()) {
           return rs.getInt(1);
         } else {
           return 0;
         }
-      } catch (SQLException e) {
-        connection.rollback();
-        throw new IOException("Failure increasing count for user/flowGroup " + name, e);
       } finally {
         if (rs != null) {
           rs.close();
         }
-        connection.close();
       }
     }
 
-    public void decreaseCount(String name, CountType countType) throws IOException, SQLException {
-      Connection connection = dataSource.getConnection();
-      connection.setAutoCommit(false);
-
+    public void decreaseCount(Connection connection, String name, CountType countType) throws IOException, SQLException {
       String selectStatement;
       String decreaseStatement;
 
@@ -241,18 +401,75 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
         rs = statement1.executeQuery();
         statement2.executeUpdate();
         statement3.executeUpdate();
-        connection.commit();
         if (rs != null && rs.next() && rs.getInt(1) == 0) {
           log.warn("Decrement job count was called for " + name + " when the count was already zero/absent.");
         }
-      } catch (SQLException e) {
-        connection.rollback();
-        throw new IOException("Failure decreasing count from user/flowGroup " + name, e);
       } finally {
         if (rs != null) {
           rs.close();
         }
-        connection.close();
+      }
+    }
+  }
+
+  static class RunningDagIdsStore {
+    protected final DataSource dataSource;
+    final String tableName;
+    private final String CONTAINS_DAG_ID;
+    private final String ADD_DAG_ID;
+    private final String REMOVE_DAG_ID;
+
+    public RunningDagIdsStore(BasicDataSource dataSource, String tableName)
+        throws IOException {
+      this.dataSource = dataSource;
+      this.tableName = tableName;
+
+      CONTAINS_DAG_ID = "SELECT EXISTS(SELECT * FROM " + tableName + " WHERE dagId = ?)" ;
+      ADD_DAG_ID = "INSERT INTO " + tableName + " (dagId) VALUES (?) ";
+      REMOVE_DAG_ID = "DELETE FROM " + tableName + " WHERE dagId = ?";
+
+      String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + " (dagId VARCHAR(500) CHARACTER SET latin1 NOT NULL, "
+          + "PRIMARY KEY (dagId), UNIQUE INDEX ind (dagId))";
+      try (Connection connection = dataSource.getConnection(); PreparedStatement createStatement = connection.prepareStatement(createQuotaTable)) {
+        createStatement.executeUpdate();
+      } catch (SQLException e) {
+        throw new IOException("Failure creation table " + tableName, e);
+      }
+    }
+
+    /**
+     * returns true if the DagID is already present in the running dag store
+     */
+    @VisibleForTesting
+    boolean contains(String dagId) throws IOException {
+      try (Connection connection = dataSource.getConnection();
+          PreparedStatement queryStatement = connection.prepareStatement(CONTAINS_DAG_ID)) {
+        queryStatement.setString(1, dagId);
+        try (ResultSet rs = queryStatement.executeQuery()) {
+          rs.next();
+          return rs.getBoolean(1);
+        }
+      } catch (Exception e) {
+        throw new IOException("Could not find if the dag " + dagId + " is already running.", e);
+      }
+    }
+
+    public void add(Connection connection, String dagId) throws IOException {
+      try (PreparedStatement statement = connection.prepareStatement(ADD_DAG_ID)) {
+        statement.setString(1, dagId);
+        statement.executeUpdate();
+      } catch (SQLException e) {
+        throw new IOException("Failure adding dag " + dagId, e);
+      }
+    }
+
+    public boolean remove(Connection connection, String dagId) throws IOException {
+      try (PreparedStatement statement = connection.prepareStatement(REMOVE_DAG_ID)) {
+        statement.setString(1, dagId);
+        int count = statement.executeUpdate();
+        return count == 1;
+      } catch (SQLException e) {
+        throw new IOException("Could not remove dag " + dagId, e);
       }
     }
   }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
index 752956d11..dff5e0a79 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -42,7 +42,7 @@ public interface UserQuotaManager {
    * It also increases the quota usage for proxy user, requester and the flowGroup of the given DagNode by one.
    * @throws QuotaExceededException if the quota is exceeded
    */
-  void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
+  void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNode) throws IOException;
 
   /**
    * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 863144a6f..c997f0603 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -250,9 +250,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     Properties properties = spec.getConfigAsProperties();
     properties.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
     Config config = ConfigFactory.parseProperties(properties);
-    FlowSpec flowSpec = new FlowSpec(spec.getUri(), spec.getVersion(), spec.getDescription(), config, properties,
+    return new FlowSpec(spec.getUri(), spec.getVersion(), spec.getDescription(), config, properties,
         spec.getTemplateURIs(), spec.getChildSpecs());
-    return flowSpec;
   }
 
   @Override
@@ -332,10 +331,12 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     // Check quota limits against run immediately flows or adhoc flows before saving the schedule
     // In warm standby mode, this quota check will happen on restli API layer when we accept the flow
     if (!this.warmStandbyEnabled && (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+      // This block should be reachable only for the first execution for the adhoc flows (flows that either do not have a schedule or have runImmediately=true.
       if (quotaManager.isPresent()) {
         // QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
         try {
-          quotaManager.get().checkQuota(dag.getNodes().get(0));
+          quotaManager.get().checkQuota(dag.getStartNodes());
+          ((FlowSpec) addedSpec).getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true");
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
index 5e63a0f00..2a0285aa9 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
@@ -20,6 +20,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -61,9 +62,9 @@ public class InMemoryUserQuotaManagerTest {
     dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
     dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
 
-    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
+    this._quotaManager.checkQuota(Collections.singleton(dags.get(0).getNodes().get(0)));
     Assert.assertThrows(IOException.class, () -> {
-      this._quotaManager.checkQuota(dags.get(1).getNodes().get(0));
+      this._quotaManager.checkQuota(Collections.singleton(dags.get(1).getNodes().get(0)));
     });
   }
 
@@ -76,7 +77,7 @@ public class InMemoryUserQuotaManagerTest {
     dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
     dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
 
-    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
+    this._quotaManager.checkQuota(Collections.singleton(dags.get(0).getNodes().get(0)));
     Assert.assertTrue(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
     Assert.assertFalse(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
   }
@@ -91,9 +92,9 @@ public class InMemoryUserQuotaManagerTest {
     dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
     dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
 
-    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
+    this._quotaManager.checkQuota(Collections.singleton(dags.get(0).getNodes().get(0)));
     Assert.assertThrows(IOException.class, () -> {
-      this._quotaManager.checkQuota(dags.get(1).getNodes().get(0));
+      this._quotaManager.checkQuota(Collections.singleton(dags.get(1).getNodes().get(0)));
     });
   }
 
@@ -115,20 +116,20 @@ public class InMemoryUserQuotaManagerTest {
     dag3.getNodes().get(0).getValue().setCurrentAttempts(1);
     dag4.getNodes().get(0).getValue().setCurrentAttempts(1);
 
-    this._quotaManager.checkQuota(dag1.getNodes().get(0));
-    this._quotaManager.checkQuota(dag2.getNodes().get(0));
+    this._quotaManager.checkQuota(Collections.singleton(dag1.getNodes().get(0)));
+    this._quotaManager.checkQuota(Collections.singleton(dag2.getNodes().get(0)));
 
     // Should fail due to user quota
     Assert.assertThrows(IOException.class, () -> {
-      this._quotaManager.checkQuota(dag3.getNodes().get(0));
+      this._quotaManager.checkQuota(Collections.singleton(dag3.getNodes().get(0)));
     });
     // Should fail due to flowgroup quota
     Assert.assertThrows(IOException.class, () -> {
-      this._quotaManager.checkQuota(dag4.getNodes().get(0));
+      this._quotaManager.checkQuota(Collections.singleton(dag4.getNodes().get(0)));
     });
     // should pass due to quota being released
     this._quotaManager.releaseQuota(dag2.getNodes().get(0));
-    this._quotaManager.checkQuota(dag3.getNodes().get(0));
-    this._quotaManager.checkQuota(dag4.getNodes().get(0));
+    this._quotaManager.checkQuota(Collections.singleton(dag3.getNodes().get(0)));
+    this._quotaManager.checkQuota(Collections.singleton(dag4.getNodes().get(0)));
   }
 }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
index 4bdd45597..a8931738d 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
 
+import java.sql.Connection;
+import java.sql.SQLException;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -36,6 +38,7 @@ public class MysqlUserQuotaManagerTest {
   private static final String TABLE = "quotas";
   private static final String PROXY_USER = "abora";
   private MysqlUserQuotaManager quotaManager;
+  public static int INCREMENTS = 1000;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -52,35 +55,63 @@ public class MysqlUserQuotaManagerTest {
   }
 
   @Test
+  public void testRunningDagStore() throws Exception {
+    String dagId = DagManagerUtils.generateDagId(DagManagerTest.buildDag("dagId", 1234L, "", 1).getNodes().get(0)).toString();
+    Connection connection = this.quotaManager.quotaStore.dataSource.getConnection();
+    Assert.assertFalse(this.quotaManager.containsDagId(dagId));
+    this.quotaManager.addDagId(connection, dagId);
+    connection.commit();
+    Assert.assertTrue(this.quotaManager.containsDagId(dagId));
+    Assert.assertTrue(this.quotaManager.removeDagId(connection, dagId));
+    connection.commit();
+    Assert.assertFalse(this.quotaManager.containsDagId(dagId));
+    Assert.assertFalse(this.quotaManager.removeDagId(connection, dagId));
+    connection.commit();
+    connection.close();
+  }
+
+    @Test
   public void testIncreaseCount() throws Exception {
-    int prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+    Connection connection = this.quotaManager.quotaStore.dataSource.getConnection();
+    int prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+    connection.commit();
     Assert.assertEquals(prevCount, 0);
 
-    prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+    prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+    connection.commit();
     Assert.assertEquals(prevCount, 1);
     Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 2);
 
-    prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    connection.commit();
     Assert.assertEquals(prevCount, 0);
 
-    prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    connection.commit();
     Assert.assertEquals(prevCount, 1);
+    connection.close();
   }
 
   @Test(dependsOnMethods = "testIncreaseCount")
   public void testDecreaseCount() throws Exception {
-    this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+    Connection connection = this.quotaManager.quotaStore.dataSource.getConnection();
+    this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+    connection.commit();
     Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 1);
 
-    this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+    this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+    connection.commit();
     Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 0);
 
-    this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+    this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+    connection.commit();
     Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 0);
 
-    this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    connection.commit();
     Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), 1);
-    this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    connection.commit();
     // on count reduced to zero, the row should get deleted and the get call should return -1 instead of 0.
     Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), -1);
   }
@@ -95,14 +126,15 @@ public class MysqlUserQuotaManagerTest {
     @Override
     public void run() {
       int i = 0;
-      while (i++ < 1000) {
-        try {
+      while (i++ < INCREMENTS) {
+        try (Connection connection = MysqlUserQuotaManagerTest.this.quotaManager.quotaStore.dataSource.getConnection();) {
           if (increaseOrDecrease) {
-            MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+            MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
           } else {
-            MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+            MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
           }
-        } catch (IOException e) {
+          connection.commit();
+        } catch (IOException | SQLException e) {
           Assert.fail("Thread got an exception.", e);
         }
       }
@@ -111,14 +143,13 @@ public class MysqlUserQuotaManagerTest {
 
   @Test(dependsOnMethods = "testDecreaseCount")
   public void testConcurrentChanges() throws IOException, InterruptedException {
-    Runnable increaseCountRunnable = new ChangeCountRunnable(true);
-    Runnable decreaseCountRunnable = new ChangeCountRunnable(false);
-    Thread thread1 = new Thread(increaseCountRunnable);
-    Thread thread2 = new Thread(increaseCountRunnable);
-    Thread thread3 = new Thread(increaseCountRunnable);
-    Thread thread4 = new Thread(decreaseCountRunnable);
-    Thread thread5 = new Thread(decreaseCountRunnable);
-    Thread thread6 = new Thread(decreaseCountRunnable);
+    int numOfThreads = 3;
+    Thread thread1 = new Thread(new ChangeCountRunnable(true));
+    Thread thread2 = new Thread(new ChangeCountRunnable(true));
+    Thread thread3 = new Thread(new ChangeCountRunnable(true));
+    Thread thread4 = new Thread(new ChangeCountRunnable(false));
+    Thread thread5 = new Thread(new ChangeCountRunnable(false));
+    Thread thread6 = new Thread(new ChangeCountRunnable(false));
 
     thread1.start();
     thread2.start();
@@ -126,7 +157,8 @@ public class MysqlUserQuotaManagerTest {
     thread1.join();
     thread2.join();
     thread3.join();
-    Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 3000);
+    Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT),
+        INCREMENTS * 3);
     thread4.start();
     thread5.start();
     thread6.start();