You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/07/25 20:49:05 UTC

[gobblin] branch master updated: [GOBBLIN-1656] Return a http status 503 on GaaS when quota is exceeded for user or flowgroup (#3516)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ce1e65aa [GOBBLIN-1656] Return a http status 503 on GaaS when quota is exceeded for user or flowgroup (#3516)
9ce1e65aa is described below

commit 9ce1e65aa36674742877b5aa2083412d85b5764f
Author: William Lo <lo...@gmail.com>
AuthorDate: Mon Jul 25 13:49:01 2022 -0700

    [GOBBLIN-1656] Return a http status 503 on GaaS when quota is exceeded for user or flowgroup (#3516)
    
    * Add e2e tests and set http response code for quota exceeded
    
    * cleanup
    
    * Fix checkstyle test
    
    * Improve guard against schedule change if quota is exceeded
    
    * Fix bug relating to exception propagation and scheduler not checking quota due to current attempt number
    
    * Address review comments
    
    * Refactor based on review feedback
    
    * Fix test
    
    * Cleanup around handling responses from callbacks in GaaS API
    
    * Fix checkstyle
    
    * catch quotaexceededexception instead of checking type explicitly
    
    * Log other errors and throw 500
    
    * Fix checkstyle dead store
    
    * Fix checkstyle again
---
 .../service/FlowConfigResourceLocalHandler.java    | 22 ++++-
 .../service/FlowConfigV2ResourceLocalHandler.java  | 20 ++++-
 .../gobblin/runtime/api/MutableSpecCatalog.java    |  2 +-
 .../gobblin/runtime/api/SpecCatalogListener.java   |  3 +-
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 23 +++---
 .../runtime/spec_catalog/FlowCatalogTest.java      | 51 +++++++++---
 .../service/modules/core/GitConfigMonitor.java     |  2 +-
 .../modules/core/GobblinServiceGuiceModule.java    |  3 +
 .../modules/core/GobblinServiceManager.java        |  5 ++
 .../modules/orchestration/DagManagerUtils.java     | 17 ++++
 .../modules/orchestration/UserQuotaManager.java    | 21 +++--
 .../scheduler/GobblinServiceJobScheduler.java      | 22 ++++-
 .../gobblin/service/GobblinServiceManagerTest.java | 23 +++++-
 .../modules/orchestration/OrchestratorTest.java    |  2 +-
 .../orchestration/UserQuotaManagerTest.java        |  2 -
 .../scheduler/GobblinServiceJobSchedulerTest.java  | 94 ++++++++++++++++++++--
 .../gobblin/exception/QuotaExceededException.java  | 27 +++++++
 .../gobblin/util/callbacks/CallbackResult.java     |  3 +-
 18 files changed, 287 insertions(+), 55 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 8430acf1e..7e91c9d1a 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -42,6 +42,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
@@ -133,7 +134,15 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
     if (!flowConfig.hasSchedule() && this.flowCatalog.exists(flowSpec.getUri())) {
       return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_409_CONFLICT);
     } else {
-      this.flowCatalog.put(flowSpec, triggerListener);
+      try {
+        this.flowCatalog.put(flowSpec, triggerListener);
+      } catch (QuotaExceededException e) {
+        throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
+      } catch (Throwable e) {
+        // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
+        log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e);
+        throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+      }
       return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED);
     }
   }
@@ -168,8 +177,15 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
       originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
       flowConfig = originalFlowConfig;
     }
-
-    this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener);
+    try {
+      this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener);
+    } catch (QuotaExceededException e) {
+      throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
+    } catch (Throwable e) {
+      // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
+      log.warn(String.format("Failed to add flow configuration %s.%sto catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
+      throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+    }
     return new UpdateResponse(HttpStatus.S_200_OK);
   }
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index b7249c1a2..d2bb55743 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -36,9 +36,12 @@ import javax.inject.Inject;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+
+
 @Slf4j
 public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsV2ResourceHandler {
 
@@ -60,9 +63,9 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
     }
     log.info(createLog);
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
-    FlowStatusId flowStatusId = new FlowStatusId()
-        .setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
-        .setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
+    FlowStatusId flowStatusId =
+        new FlowStatusId().setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
+                          .setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
     if (flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
       flowStatusId.setFlowExecutionId(Long.valueOf(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)));
     } else {
@@ -76,7 +79,16 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
           "FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken"));
     }
 
-    Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec, triggerListener);
+    Map<String, AddSpecResponse> responseMap;
+    try {
+      responseMap = this.flowCatalog.put(flowSpec, triggerListener);
+    } catch (QuotaExceededException e) {
+        throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
+    } catch (Throwable e) {
+      // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
+      log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e);
+      throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+    }
     HttpStatus httpStatus;
 
     if (flowConfig.hasExplain() && flowConfig.isExplain()) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 3c1573a2e..e6c81f3aa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -48,7 +48,7 @@ public interface MutableSpecCatalog extends SpecCatalog {
    * on adding a {@link Spec} to the {@link SpecCatalog}. The key for each entry is the name of the {@link SpecCatalogListener}
    * and the value is the result of the the action taken by the listener returned as an instance of {@link AddSpecResponse}.
    * */
-  Map<String, AddSpecResponse> put(Spec spec);
+  Map<String, AddSpecResponse> put(Spec spec) throws Throwable;
 
   /**
    * Removes an existing {@link Spec} with the given URI.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
index 67f2e39ca..6bf61a1e6 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
@@ -48,7 +48,8 @@ public interface SpecCatalogListener {
       _addedSpec = addedSpec;
     }
 
-    @Override public AddSpecResponse apply(SpecCatalogListener listener) {
+    @Override
+     public AddSpecResponse apply(SpecCatalogListener listener) {
       return listener.onAddSpec(_addedSpec);
     }
   }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index c9c17b027..f459c9588 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -342,7 +342,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
    * @param triggerListener True if listeners should be notified.
    * @return a map of listeners and their {@link AddSpecResponse}s
    */
-  public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
+  public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) throws Throwable {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
     FlowSpec flowSpec = (FlowSpec) spec;
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
@@ -355,13 +355,21 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
 
     if (triggerListener) {
       AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
-      // If flow fails compilation, the result will have a non-empty string with the error
       for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getSuccesses().entrySet()) {
         responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
       }
+      // If flow fails compilation, the result will have a non-empty string with the error
+      if (response.getValue().getFailures().size() > 0) {
+        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getFailures().entrySet()) {
+          throw entry.getValue().getError().getCause();
+        }
+        return responseMap;
+      }
     }
+    AddSpecResponse<String> schedulerResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
 
-    if (isCompileSuccessful(responseMap)) {
+    // Check that the flow configuration is valid and matches to a corresponding edge
+    if (isCompileSuccessful(schedulerResponse.getValue())) {
       synchronized (syncObject) {
         try {
           if (!flowSpec.isExplain()) {
@@ -384,19 +392,12 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     return responseMap;
   }
 
-  public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
-    // If we cannot get the response from the scheduler, assume that the flow failed compilation
-    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
-        ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
-    return isCompileSuccessful(addSpecResponse.getValue());
-  }
-
   public static boolean isCompileSuccessful(String dag) {
     return dag != null && !dag.contains(ConfigException.class.getSimpleName());
   }
 
   @Override
-  public Map<String, AddSpecResponse> put(Spec spec) {
+  public Map<String, AddSpecResponse> put(Spec spec) throws Throwable {
     return put(spec, true);
   }
 
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index 319a0f384..f79d8501a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -21,6 +21,7 @@ import com.google.common.base.Optional;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import java.io.File;
 import java.net.URI;
 import java.util.Collection;
@@ -28,6 +29,7 @@ import java.util.Map;
 import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
@@ -105,19 +107,24 @@ public class FlowCatalogTest {
     return initFlowSpec(specStore, uri, "flowName");
   }
 
-  /**
-   * Create FLowSpec with specified URI and SpecStore location.
-   */
+    /**
+     * Create FLowSpec with specified URI and SpecStore location.
+     */
   public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName){
+    return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty());
+  }
+
+  public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, String flowGroup, Config additionalConfigs) {
     Properties properties = new Properties();
     properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
+    properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
     properties.put("job.name", flowName);
-    properties.put("job.group", flowName);
+    properties.put("job.group", flowGroup);
     properties.put("specStore.fs.dir", specStore);
     properties.put("specExecInstance.capabilities", "source:destination");
     properties.put("job.schedule", "0 0 0 ? * * 2050");
-    Config config = ConfigUtils.propertiesToConfig(properties);
-
+    Config defaults = ConfigUtils.propertiesToConfig(properties);
+    Config config = additionalConfigs.withFallback(defaults);
     SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config);
 
     FlowSpec.Builder flowSpecBuilder = null;
@@ -141,7 +148,7 @@ public class FlowCatalogTest {
   }
 
   @Test
-  public void createFlowSpec() {
+  public void createFlowSpec() throws Throwable {
     // List Current Specs
     Collection<Spec> specs = flowCatalog.getSpecs();
     logger.info("[Before Create] Number of specs: " + specs.size());
@@ -199,7 +206,7 @@ public class FlowCatalogTest {
   }
 
   @Test (dependsOnMethods = "deleteFlowSpec")
-  public void testRejectBadFlow() {
+  public void testRejectBadFlow() throws Throwable {
     Collection<Spec> specs = flowCatalog.getSpecs();
     logger.info("[Before Create] Number of specs: " + specs.size());
     int i=0;
@@ -223,7 +230,7 @@ public class FlowCatalogTest {
   }
 
   @Test (dependsOnMethods = "testRejectBadFlow")
-  public void testRejectMissingListener() {
+  public void testRejectMissingListener() throws Throwable {
     flowCatalog.removeListener(this.mockListener);
     Collection<Spec> specs = flowCatalog.getSpecs();
     logger.info("[Before Create] Number of specs: " + specs.size());
@@ -244,6 +251,32 @@ public class FlowCatalogTest {
     Assert.assertEquals(flowCatalog.getSize(), 0);
   }
 
+  @Test (dependsOnMethods = "testRejectMissingListener")
+  public void testRejectQuotaExceededFlow() {
+    Collection<Spec> specs = flowCatalog.getSpecs();
+    logger.info("[Before Create] Number of specs: " + specs.size());
+    int i=0;
+    for (Spec spec : specs) {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
+    }
+    Assert.assertEquals(specs.size(), 0, "Spec store should be empty before addition");
+
+    // Create and add Spec
+    FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(), "badFlow");
+
+    // Assume that spec is rejected
+    when(this.mockListener.onAddSpec(any())).thenThrow(new RuntimeException(new QuotaExceededException("error")));
+    try {
+      Map<String, AddSpecResponse> response = this.flowCatalog.put(badSpec);
+    } catch (Throwable e) {
+      Assert.assertTrue(e instanceof QuotaExceededException);
+    }
+    // Spec should be rejected from being stored
+    specs = flowCatalog.getSpecs();
+    Assert.assertEquals(specs.size(), 0);
+  }
+
   public static URI computeFlowSpecURI() {
     // Make sure this is relative
     URI uri = PathUtils.relativizePath(new Path(SPEC_GROUP_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 5aa0b438e..101d9f46d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -108,7 +108,7 @@ public class GitConfigMonitor extends GitMonitoringService {
             .withVersion(SPEC_VERSION)
             .withDescription(SPEC_DESCRIPTION)
             .build());
-      } catch (IOException e) {
+      } catch (Throwable e) {
         log.warn("Could not load config file: " + configFilePath);
       }
     }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 1a1bce804..efb03339c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.core;
 
 import java.util.Objects;
 
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -194,6 +195,8 @@ public class GobblinServiceGuiceModule implements Module {
       binder.bind(Orchestrator.class);
       binder.bind(SchedulerService.class);
       binder.bind(GobblinServiceJobScheduler.class);
+      OptionalBinder.newOptionalBinder(binder, UserQuotaManager.class);
+      binder.bind(UserQuotaManager.class);
     }
 
     if (serviceConfig.isGitConfigMonitorEnabled()) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index c7fc958d0..d35eef64b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -31,6 +31,7 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.ObjectUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -197,6 +198,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
   @Inject
   protected ServiceDatabaseManager databaseManager;
 
+  @Inject(optional=true)
+  @Getter
+  protected Optional<UserQuotaManager> quotaManager;
+
   protected Optional<HelixLeaderState> helixLeaderGauges;
 
   @Inject(optional = true)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index a472a37cb..9d59a9085 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -32,6 +33,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import java.util.stream.Collectors;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -41,6 +43,7 @@ import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
@@ -340,4 +343,18 @@ public class DagManagerUtils {
       eventSubmitter.get().getTimingEvent(flowEvent).stop(flowMetadata);
     }
   }
+
+  static List<String> getDistinctUniqueRequesters(String serializedRequesters) {
+    List<String> uniqueRequesters;
+    try {
+      uniqueRequesters = RequesterService.deserialize(serializedRequesters)
+          .stream()
+          .map(ServiceRequester::getName)
+          .distinct()
+          .collect(Collectors.toList());
+      return uniqueRequesters;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not process requesters due to ", e);
+    }
+  }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
index c49cdfc7f..36b4d2718 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -17,6 +17,7 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
 import com.typesafe.config.Config;
 import java.io.IOException;
 import java.util.HashSet;
@@ -24,9 +25,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import javax.inject.Singleton;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.service.RequesterService;
 import org.apache.gobblin.service.ServiceRequester;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -40,6 +42,7 @@ import org.apache.gobblin.util.ConfigUtils;
  * is exceeded, then the execution will fail without running on the underlying executor
  */
 @Slf4j
+@Singleton
 public class UserQuotaManager {
   public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota";
   public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
@@ -54,7 +57,8 @@ public class UserQuotaManager {
   Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
   private final int defaultQuota;
 
-  UserQuotaManager(Config config) {
+  @Inject
+  public UserQuotaManager(Config config) {
     this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
     ImmutableMap.Builder<String, Integer> userMapBuilder = ImmutableMap.builder();
     ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = ImmutableMap.builder();
@@ -72,9 +76,9 @@ public class UserQuotaManager {
 
   /**
    * Checks if the dagNode exceeds the statically configured user quota for both the proxy user, requester user, and flowGroup
-   * @throws IOException if the quota is exceeded, and logs a statement
+   * @throws QuotaExceededException if the quota is exceeded, and logs a statement
    */
-  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws IOException {
+  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws QuotaExceededException {
     // Dag is already being tracked, no need to double increment for retries and multihop flows
     if (isDagCurrentlyRunning(dagNode)) {
       return;
@@ -103,8 +107,7 @@ public class UserQuotaManager {
     boolean requesterCheck = true;
 
     if (serializedRequesters != null) {
-      List<String> uniqueRequesters = RequesterService.deserialize(serializedRequesters).stream()
-          .map(ServiceRequester::getName).distinct().collect(Collectors.toList());
+      List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
       for (String requester : uniqueRequesters) {
         int userQuotaIncrement = incrementJobCountAndCheckQuota(
             DagManagerUtils.getUserQuotaKey(requester, dagNode), requesterToJobCount, dagNode, getQuotaForUser(requester));
@@ -135,7 +138,7 @@ public class UserQuotaManager {
       decrementQuotaUsage(flowGroupToJobCount, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
       decrementQuotaUsageForUsers(usersQuotaIncrement);
       runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
-      throw new IOException(requesterMessage.toString());
+      throw new QuotaExceededException(requesterMessage.toString());
     }
   }
 
@@ -148,7 +151,9 @@ public class UserQuotaManager {
    */
   private int incrementJobCountAndCheckQuota(String key, Map<String, Integer> quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
     // Only increment job count for first attempt, since job is considered running between retries
-    if (dagNode.getValue().getCurrentAttempts() != 1) {
+    // Include the scenario where currentAttempts is 0 (when checked by the scheduler)
+    // but it will not double increment due to first ensuring that the dag is not already incremented
+    if (dagNode.getValue().getCurrentAttempts() > 1) {
       return 0;
     }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index ffd2f15a4..d00365f13 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -35,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
@@ -54,6 +55,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.utils.InjectionNames;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
@@ -91,6 +93,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   protected final Optional<FlowCatalog> flowCatalog;
   protected final Optional<HelixManager> helixManager;
   protected final Orchestrator orchestrator;
+  protected final Optional<UserQuotaManager> quotaManager;
   @Getter
   protected final Map<String, Spec> scheduledFlowSpecs;
   @Getter
@@ -119,7 +122,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   @Inject
   public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName, Config config,
       Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
-      Orchestrator orchestrator, SchedulerService schedulerService, Optional<Logger> log) throws Exception {
+      Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log) throws Exception {
     super(ConfigUtils.configToProperties(config), schedulerService);
 
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -130,14 +133,15 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     this.scheduledFlowSpecs = Maps.newHashMap();
     this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
         && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
+    this.quotaManager = quotaManager;
   }
 
   public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
       Optional<HelixManager> helixManager,
-      Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager,
+      Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<UserQuotaManager> quotaManager,
       SchedulerService schedulerService,  Optional<Logger> log) throws Exception {
     this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
-        new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, log);
+        new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, quotaManager, log);
   }
 
   public synchronized void setActive(boolean isActive) {
@@ -322,6 +326,18 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
       return new AddSpecResponse<>(response);
     }
 
+    // Check quota limits against run immediately flows or adhoc flows before saving the schedule
+    if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+      if (quotaManager.isPresent()) {
+        // QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
+        try {
+          quotaManager.get().checkQuota(dag.getNodes().get(0), false);
+        } catch (QuotaExceededException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
     // todo : we should probably not schedule a flow if it is a runOnce flow
     this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 2572fda66..13e607d19 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
 import org.eclipse.jgit.api.Git;
@@ -162,7 +164,7 @@ public class GobblinServiceManagerTest {
     serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
 
     serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY, MockedSpecCompiler.class.getCanonicalName());
-
+    serviceCoreProperties.put(UserQuotaManager.PER_USER_QUOTA, "testUser:1");
     transportClientProperties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, "10000");
 
     // Create a bare repository
@@ -308,6 +310,25 @@ public class GobblinServiceManagerTest {
   }
 
   @Test (dependsOnMethods = "testRunOnceJob")
+  public void testRunQuotaExceeds() throws Exception {
+    Map<String, String> props = flowProperties;
+    props.put(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, "testUser");
+    FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(props));
+
+    this.flowConfigClient.createFlowConfig(flowConfig);
+
+    FlowConfig flowConfig2 = new FlowConfig().setId(TEST_FLOW_ID2)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(props));
+
+    try {
+      this.flowConfigClient.createFlowConfig(flowConfig2);
+    } catch (RestLiResponseException e) {
+      Assert.assertEquals(e.getStatus(), HttpStatus.SERVICE_UNAVAILABLE_503);
+    }
+  }
+
+  @Test (dependsOnMethods = "testRunQuotaExceeds")
   public void testExplainJob() throws Exception {
     FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
         .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)).setExplain(true);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 22208498d..c25c91b59 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -226,7 +226,7 @@ public class OrchestratorTest {
   }
 
   @Test (dependsOnMethods = "createTopologySpec")
-  public void createFlowSpec() throws Exception {
+  public void createFlowSpec() throws Throwable {
     // Since only 1 Topology with 1 SpecProducer has been added in previous test
     // .. it should be available and responsible for our new FlowSpec
     IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
index 98ebc6bca..9fc9438db 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
@@ -52,8 +52,6 @@ public class UserQuotaManagerTest {
     this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), true);
     // Should not be throwing the exception
     this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), true);
-
-    // TODO: add verification when adding a public method for getting the current count and quota per user
   }
 
   @Test
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index a9323971b..0750392ee 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -21,26 +21,37 @@ import com.google.common.base.Predicate;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 import java.io.File;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -48,6 +59,7 @@ import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.apache.gobblin.runtime.spec_catalog.FlowCatalog.*;
@@ -61,11 +73,16 @@ public class GobblinServiceJobSchedulerTest {
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
 
+  private Config quotaConfig;
+  @BeforeClass
+  public void setUp() {
+    this.quotaConfig = ConfigFactory.empty().withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA, ConfigValueFactory.fromAnyRef("group1:1"));
+  }
   /**
    * Test whenever JobScheduler is calling setActive, the FlowSpec is loading into scheduledFlowSpecs (eventually)
    */
   @Test
-  public void testJobSchedulerInit() throws Exception {
+  public void testJobSchedulerInit() throws Throwable {
     // Mock a FlowCatalog.
     File specDir = Files.createTempDir();
 
@@ -90,10 +107,11 @@ public class GobblinServiceJobSchedulerTest {
     Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
 
     Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+    UserQuotaManager quotaManager = new UserQuotaManager(quotaConfig);
 
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, null);
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(quotaManager), null);
 
     SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -145,7 +163,7 @@ public class GobblinServiceJobSchedulerTest {
    * Test that flowSpecs that throw compilation errors do not block the scheduling of other flowSpecs
    */
   @Test
-  public void testJobSchedulerInitWithFailedSpec() throws Exception {
+  public void testJobSchedulerInitWithFailedSpec() throws Throwable {
     // Mock a FlowCatalog.
     File specDir = Files.createTempDir();
 
@@ -179,7 +197,7 @@ public class GobblinServiceJobSchedulerTest {
 
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, null);
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), null);
 
     SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -209,7 +227,7 @@ public class GobblinServiceJobSchedulerTest {
    * Test that flowSpecs that throw compilation errors do not block the scheduling of other flowSpecs
    */
   @Test
-  public void testJobSchedulerUnschedule() throws Exception {
+  public void testJobSchedulerUnschedule() throws Throwable {
     // Mock a FlowCatalog.
     File specDir = Files.createTempDir();
 
@@ -242,7 +260,7 @@ public class GobblinServiceJobSchedulerTest {
     SchedulerService schedulerService = new SchedulerService(new Properties());
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService );
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), schedulerService );
 
     schedulerService.startAsync().awaitRunning();
     scheduler.startUp();
@@ -283,14 +301,60 @@ public class GobblinServiceJobSchedulerTest {
     Assert.assertEquals(schedulerService.getScheduler().getJobGroupNames().size(), 0);
   }
 
+  @Test
+  public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
+    File specDir = Files.createTempDir();
+
+    Properties properties = new Properties();
+    properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+    FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+    ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+
+    serviceLauncher.addService(flowCatalog);
+    serviceLauncher.start();
+
+    FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"), "flowName0", "group1",
+        ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true")));
+    FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), "flowName1", "group1",
+        ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true")));
+
+    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+    SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class);
+    when(mockOrchestrator.getSpecCompiler()).thenReturn(mockSpecCompiler);
+    Dag<JobExecutionPlan> mockDag0 = this.buildDag(flowSpec0.getConfig(), "0");
+    Dag<JobExecutionPlan> mockDag1 = this.buildDag(flowSpec1.getConfig(), "1");
+    when(mockSpecCompiler.compileFlow(flowSpec0)).thenReturn(mockDag0);
+    when(mockSpecCompiler.compileFlow(flowSpec1)).thenReturn(mockDag1);
+
+    SchedulerService schedulerService = new SchedulerService(new Properties());
+    // Mock a GaaS scheduler.
+    GobblinServiceJobScheduler scheduler = new GobblinServiceJobScheduler("testscheduler",
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new UserQuotaManager(quotaConfig)), Optional.absent());
+
+    schedulerService.startAsync().awaitRunning();
+    scheduler.startUp();
+    scheduler.setActive(true);
+
+    scheduler.onAddSpec(flowSpec0); //Ignore the response for this request
+    Assert.assertThrows(RuntimeException.class, () -> scheduler.onAddSpec(flowSpec1));
+
+    Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
+    // Second flow should not be added to scheduled flows since it was rejected
+    Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
+    // set scheduler to be inactive and unschedule flows
+    scheduler.setActive(false);
+    Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);
+  }
+
   class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
     public boolean isCompilerHealthy = false;
     private boolean hasScheduler = false;
 
     public TestGobblinServiceJobScheduler(String serviceName, Config config,
-        Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator,
+        Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager> quotaManager,
         SchedulerService schedulerService) throws Exception {
-      super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, Optional.absent());
+      super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, quotaManager, Optional.absent());
       if (schedulerService != null) {
         hasScheduler = true;
       }
@@ -318,4 +382,18 @@ public class GobblinServiceJobSchedulerTest {
       return new AddSpecResponse(addedSpec.getDescription());
     }
   }
+
+   Dag<JobExecutionPlan> buildDag(Config additionalConfig, String id) throws URISyntaxException {
+    Config config = ConfigFactory.empty().
+        withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+    config = additionalConfig.withFallback(config);
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    JobSpec js = JobSpec.builder("test_job_" + id).withVersion(id).withConfig(config).
+        withTemplate(new URI("job_" + id)).build();
+    SpecExecutor specExecutor = InMemorySpecExecutor.createDummySpecExecutor(new URI("jobExecutor"));
+    JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor);
+    jobExecutionPlans.add(jobExecutionPlan);
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
 }
\ No newline at end of file
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/exception/QuotaExceededException.java b/gobblin-utility/src/main/java/org/apache/gobblin/exception/QuotaExceededException.java
new file mode 100644
index 000000000..6ab63d150
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/exception/QuotaExceededException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.exception;
+
+import java.io.IOException;
+
+
+public class QuotaExceededException extends IOException {
+
+  public QuotaExceededException(String message) {
+    super(message);
+  }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java
index 0132a8249..8c1f751a3 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java
@@ -16,7 +16,6 @@
  */
 package org.apache.gobblin.util.callbacks;
 
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Preconditions;
@@ -64,7 +63,7 @@ public class CallbackResult<R> {
       R res = execFuture.get();
       return createSuccessful(res);
     }
-    catch (ExecutionException e) {
+    catch (Exception e) {
       if (execFuture.isCancelled()) {
         return createCancelled();
       }