You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/09/01 16:20:30 UTC

[gobblin] branch master updated: [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (#3544)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a80b2adc [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (#3544)
4a80b2adc is described below

commit 4a80b2adc90e7f3962f85fec93aec1a51e58c818
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Thu Sep 1 09:20:23 2022 -0700

    [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (#3544)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode
    
    * add orchestor as listener before service start
    
    * fix code style
    
    * address comments
    
    * fix test case to test orchestor as one listener of flow spec
    
    * remove unintentional change
    
    * remove unused import
    
    * address comments
    
    * fix typo
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |  3 ++
 .../service/FlowConfigV2ResourceLocalHandler.java  |  2 +-
 .../org/apache/gobblin/runtime/api/FlowSpec.java   |  9 +++-
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 23 +++++++---
 .../gobblin/runtime/util}/InjectionNames.java      |  3 +-
 .../modules/core/GobblinServiceConfiguration.java  |  5 +++
 .../modules/core/GobblinServiceGuiceModule.java    |  5 ++-
 .../modules/core/GobblinServiceManager.java        |  5 ++-
 .../modules/flow/BaseFlowToJobSpecCompiler.java    | 52 +++++++++++++++++++---
 .../modules/orchestration/Orchestrator.java        |  9 +++-
 .../GobblinServiceFlowConfigResourceHandler.java   |  2 +-
 .../GobblinServiceFlowConfigV2ResourceHandler.java |  2 +-
 ...GobblinServiceFlowExecutionResourceHandler.java |  2 +-
 .../scheduler/GobblinServiceJobScheduler.java      | 17 ++++---
 .../modules/orchestration/OrchestratorTest.java    | 42 ++++++++++++-----
 .../scheduler/GobblinServiceJobSchedulerTest.java  | 30 ++++++++++---
 16 files changed, 166 insertions(+), 45 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index c1536e037..dc149e20b 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -26,6 +26,7 @@ public class ServiceConfigKeys {
 
   public static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
   public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS = "org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
+  public static final String GOBBLIN_ORCHESTRATOR_LISTENER_CLASS = "org.apache.gobblin.service.modules.orchestration.Orchestrator";
 
   // Gobblin Service Manager Keys
   public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
@@ -38,6 +39,7 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
   public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false;
   public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
+  public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
   // If true, will mark up/down d2 servers on leadership so that all requests will be routed to the leader node
   public static final String GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER = GOBBLIN_SERVICE_PREFIX + "d2.onlyAnnounceLeader";
 
@@ -57,6 +59,7 @@ public class ServiceConfigKeys {
   // Flow Compiler Keys
   public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class";
   public static final String COMPILATION_SUCCESSFUL = "compilation.successful";
+  public static final String COMPILATION_RESPONSE = "compilation.response";
 
   // Flow Catalog Keys
   public static final String GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = GOBBLIN_SERVICE_PREFIX + "flowCatalog.localCommit";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index d2bb55743..6370192ca 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -95,7 +95,7 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
       //This is an Explain request. So no resource is actually created.
       //Enrich original FlowConfig entity by adding the compiledFlow to the properties map.
       StringMap props = flowConfig.getProperties();
-      AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
+      AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_RESPONSE, null);
       props.put("gobblin.flow.compiled",
           addSpecResponse != null && addSpecResponse.getValue() != null ? StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
       flowConfig.setProperties(props);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 5675da1a1..edeea4744 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -133,7 +133,8 @@ public class FlowSpec implements Configurable, Spec {
   public static class CompilationError {
     public int errorPriority;
     public String errorMessage;
-    CompilationError(Config config, String src, String dst, String errorMessage) {
+
+    public CompilationError(Config config, String src, String dst, String errorMessage) {
       errorPriority = 0;
       if (!src.equals(ConfigUtils.getString(config, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))){
         errorPriority++;
@@ -144,8 +145,14 @@ public class FlowSpec implements Configurable, Spec {
       }
       this.errorMessage = errorMessage;
     }
+
+    public CompilationError(int errorPriority, String errorMessage) {
+      this.errorPriority = errorPriority;
+      this.errorMessage = errorMessage;
+    }
   }
 
+
   public String toShortString() {
     return getUri().toString() + "/" + getVersion();
   }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index b269279eb..6a64ae9bb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -28,7 +28,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import javax.inject.Named;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.runtime.util.InjectionNames;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,6 +87,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   protected final Logger log;
   protected final MetricContext metricContext;
   protected final MutableStandardMetrics metrics;
+  protected final boolean isWarmStandbyEnabled;
   @Getter
   protected final SpecStore specStore;
   // a map which keeps a handle of condition variables for each spec being added to the flow catalog
@@ -98,17 +101,17 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   }
 
   public FlowCatalog(Config config, Optional<Logger> log) {
-    this(config, log, Optional.<MetricContext>absent(), true);
+    this(config, log, Optional.<MetricContext>absent(), true, false);
   }
 
   @Inject
-  public FlowCatalog(Config config, GobblinInstanceEnvironment env) {
+  public FlowCatalog(Config config, GobblinInstanceEnvironment env, @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled) {
     this(config, Optional.of(env.getLog()), Optional.of(env.getMetricContext()),
-        env.isInstrumentationEnabled());
+        env.isInstrumentationEnabled(), isWarmStandbyEnabled);
   }
 
   public FlowCatalog(Config config, Optional<Logger> log, Optional<MetricContext> parentMetricContext,
-      boolean instrumentationEnabled) {
+      boolean instrumentationEnabled, boolean isWarmStandbyEnabled) {
     this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.listeners = new SpecCatalogListenersList(log);
     if (instrumentationEnabled) {
@@ -121,6 +124,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
       this.metricContext = null;
       this.metrics = null;
     }
+    this.isWarmStandbyEnabled = isWarmStandbyEnabled;
 
     this.aliasResolver = new ClassAliasResolver<>(SpecStore.class);
 
@@ -383,10 +387,17 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
         return responseMap;
       }
     }
-    AddSpecResponse<String> schedulerResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
+    AddSpecResponse<String> compileResponse;
+    if (isWarmStandbyEnabled) {
+      compileResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_ORCHESTRATOR_LISTENER_CLASS, new AddSpecResponse<>(null));
+      //todo: do we check quota here? or in compiler? Quota manager need dag to check quota which is not accessable from this class
+    } else {
+      compileResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
+    }
+    responseMap.put(ServiceConfigKeys.COMPILATION_RESPONSE, compileResponse);
 
     // Check that the flow configuration is valid and matches to a corresponding edge
-    if (isCompileSuccessful(schedulerResponse.getValue())) {
+    if (isCompileSuccessful(compileResponse.getValue())) {
       synchronized (syncObject) {
         try {
           if (!flowSpec.isExplain()) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
similarity index 90%
rename from gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java
rename to gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
index ebe6142af..d0e42f525 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.service.modules.utils;
+package org.apache.gobblin.runtime.util;
 
 /**
  * These names are used for dependency injection, when we need to inject different instances of the same type,
@@ -25,4 +25,5 @@ public final class InjectionNames {
   public static final String SERVICE_NAME = "serviceName";
   public static final String FORCE_LEADER = "forceLeader";
   public static final String FLOW_CATALOG_LOCAL_COMMIT = "flowCatalogLocalCommit";
+  public static final String WARM_STANDBY_ENABLED = "statelessRestAPIEnabled";
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
index 25c263a4d..0a3640da5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -40,6 +40,9 @@ public class GobblinServiceConfiguration {
   @Getter
   private final String serviceId;
 
+  @Getter
+  private final boolean isWarmStandbyEnabled;
+
   @Getter
   private final boolean isTopologyCatalogEnabled;
 
@@ -103,6 +106,8 @@ public class GobblinServiceConfiguration {
       isGitConfigMonitorEnabled = false;
     }
 
+    this.isWarmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
+
     this.isHelixManagerEnabled = config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
     this.isDagManagerEnabled =
         ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 68170a129..653354d45 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -73,7 +73,7 @@ import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
 import org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
-import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -136,6 +136,9 @@ public class GobblinServiceGuiceModule implements Module {
     binder.bindConstant()
         .annotatedWith(Names.named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT))
         .to(serviceConfig.isFlowCatalogLocalCommit());
+    binder.bindConstant()
+        .annotatedWith(Names.named(InjectionNames.WARM_STANDBY_ENABLED))
+        .to(serviceConfig.isWarmStandbyEnabled());
 
     binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
     binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandler.class);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 76f568495..463b19eab 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -394,7 +394,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     registerServicesInLauncher();
 
     // Register Scheduler to listen to changes in Flows
-    if (configuration.isSchedulerEnabled()) {
+    // In warm standby mode, instead of scheduler we will add orchestrator as listener
+    if(configuration.isWarmStandbyEnabled()) {
+      this.flowCatalog.addListener(this.orchestrator);
+    } else if (configuration.isSchedulerEnabled()) {
       this.flowCatalog.addListener(this.scheduler);
     }
   }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 244a2fad2..f910e9e4e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.lang3.StringUtils;
+import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -149,15 +150,52 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
     return;
   }
 
-  @Override
-  public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
-    TopologySpec spec = (TopologySpec) addedSpec;
-    log.info ("Loading topology {}", spec.toLongString());
-    for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
-      log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+  private synchronized  AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+    log.info("Loading topology {}", spec.toLongString());
+    for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+      log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
+    }
+
+    topologySpecMap.put(spec.getUri(), spec);
+    return new AddSpecResponse(null);
+  }
+
+  private  AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+    Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+    if (topologySpecMap.containsKey(flowSpec.getUri())) {
+      log.error("flow spec URI: {} is the same as one of the spec executors uris, ignore the flow", flowSpec.getUri());
+      flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0, "invalid flow spec uri " + flowSpec.getUri() + " because it is the same as one of the spec executors uri"));
+      return null;
+    }
+    if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils.isNotBlank(
+        flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+      try {
+        new CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+      } catch (Exception e) {
+        log.error("invalid cron schedule: {}", flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+        flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0, "invalid cron schedule: " + flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) + e.getMessage()));
+        return null;
+      }
     }
+    String response = null;
 
-    topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+    // always try to compile the flow to verify if it is compilable
+    Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+    // If dag is null then a compilation error has occurred
+    if (dag != null && !dag.isEmpty()) {
+      response = dag.toString();
+    }
+    // todo: should we check quota here?
+    return new AddSpecResponse<>(response);
+  }
+
+  @Override
+  public AddSpecResponse onAddSpec(Spec addedSpec) {
+    if (addedSpec instanceof FlowSpec) {
+      return onAddFlowSpec((FlowSpec) addedSpec);
+    } else if (addedSpec instanceof TopologySpec) {
+      return onAddTopologySpec( (TopologySpec) addedSpec);
+    }
     return new AddSpecResponse(null);
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 46a37e74c..19a1acdc5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -75,7 +75,8 @@ import org.apache.gobblin.util.ConfigUtils;
 
 /**
  * Orchestrator that is a {@link SpecCatalogListener}. It listens to changes
- * to {@link TopologyCatalog} and updates {@link SpecCompiler} state.
+ * to {@link TopologyCatalog} and updates {@link SpecCompiler} state
+ * Also it listens to {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} and use the compiler to compile the new flow spec.
  */
 @Alpha
 @Singleton
@@ -169,6 +170,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
     if (addedSpec instanceof TopologySpec) {
       _log.info("New Spec detected of type TopologySpec: " + addedSpec);
       this.specCompiler.onAddSpec(addedSpec);
+    } else if (addedSpec instanceof FlowSpec) {
+      _log.info("New Spec detected of type FlowSpec: " + addedSpec);
+      return this.specCompiler.onAddSpec(addedSpec);
     }
     return new AddSpecResponse(null);
   }
@@ -191,6 +195,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   @Override
   public void onUpdateSpec(Spec updatedSpec) {
     _log.info("Spec changed: " + updatedSpec);
+    if (updatedSpec instanceof FlowSpec) {
+      onAddSpec(updatedSpec);
+    }
 
     if (!(updatedSpec instanceof TopologySpec)) {
       return;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 44890a21d..8045ae171 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -50,7 +50,7 @@ import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
-import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.runtime.util.InjectionNames;
 
 
 /**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java
index 74ed9a0dd..6389d4d0e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java
@@ -27,7 +27,7 @@ import javax.inject.Named;
 import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
 import org.apache.gobblin.service.FlowConfigsV2ResourceHandler;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
-import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.runtime.util.InjectionNames;
 
 
 public class GobblinServiceFlowConfigV2ResourceHandler extends GobblinServiceFlowConfigResourceHandler
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index 15ffc2f42..489eb2b4d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -40,7 +40,7 @@ import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.FlowStatusId;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
-import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.monitoring.KillFlowEvent;
 import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index d00365f13..c8286740a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -57,7 +57,7 @@ import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
@@ -93,6 +93,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   protected final Optional<FlowCatalog> flowCatalog;
   protected final Optional<HelixManager> helixManager;
   protected final Orchestrator orchestrator;
+  protected final Boolean warmStandbyEnabled;
   protected final Optional<UserQuotaManager> quotaManager;
   @Getter
   protected final Map<String, Spec> scheduledFlowSpecs;
@@ -120,9 +121,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   public static final String DR_FILTER_TAG = "dr";
 
   @Inject
-  public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName, Config config,
+  public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName,
+      Config config,
       Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
-      Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log) throws Exception {
+      Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
+      @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled) throws Exception {
     super(ConfigUtils.configToProperties(config), schedulerService);
 
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -133,15 +136,16 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     this.scheduledFlowSpecs = Maps.newHashMap();
     this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
         && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
+    this.warmStandbyEnabled = warmStandbyEnabled;
     this.quotaManager = quotaManager;
   }
 
   public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
       Optional<HelixManager> helixManager,
       Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<UserQuotaManager> quotaManager,
-      SchedulerService schedulerService,  Optional<Logger> log) throws Exception {
+      SchedulerService schedulerService,  Optional<Logger> log, boolean warmStandbyEnabled) throws Exception {
     this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
-        new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, quotaManager, log);
+        new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, quotaManager, log, warmStandbyEnabled);
   }
 
   public synchronized void setActive(boolean isActive) {
@@ -327,7 +331,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     }
 
     // Check quota limits against run immediately flows or adhoc flows before saving the schedule
-    if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+    // In warm standby mode, this quota check will happen on restli API layer when we accept the flow
+    if (!this.warmStandbyEnabled && (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
       if (quotaManager.isPresent()) {
         // QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
         try {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index c25c91b59..b1cec2b3a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -31,7 +31,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
-import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +49,6 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
@@ -59,7 +57,6 @@ import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
 
 
@@ -103,14 +100,10 @@ public class OrchestratorTest {
         Optional.of(logger));
     this.serviceLauncher.addService(topologyCatalog);
 
+    // Test warm standby flow catalog, which has orchestrator as listener
     this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties),
-        Optional.of(logger));
-
-    this.mockListener = mock(SpecCatalogListener.class);
-    when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
-    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+        Optional.of(logger), Optional.<MetricContext>absent(), true, true);
 
-    this.flowCatalog.addListener(mockListener);
     this.serviceLauncher.addService(flowCatalog);
     this.mockStatusGenerator = mock(FlowStatusGenerator.class);
 
@@ -119,7 +112,6 @@ public class OrchestratorTest {
         Optional.of(this.topologyCatalog), Optional.<DagManager>absent(), Optional.of(logger));
     this.topologyCatalog.addListener(orchestrator);
     this.flowCatalog.addListener(orchestrator);
-
     // Start application
     this.serviceLauncher.start();
     // Create Spec to play with
@@ -152,6 +144,33 @@ public class OrchestratorTest {
   }
 
   private FlowSpec initFlowSpec() {
+    Properties properties = new Properties();
+    String flowName = "test_flowName";
+    String flowGroup = "test_flowGroup";
+    properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
+    properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
+    properties.put("job.name", flowName);
+    properties.put("job.group", flowGroup);
+    properties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR);
+    properties.put("specExecInstance.capabilities", "source:destination");
+    properties.put("job.schedule", "0 0 0 ? * * 2050");
+    ;
+    properties.put("gobblin.flow.sourceIdentifier", "source");
+    properties.put("gobblin.flow.destinationIdentifier", "destination");
+    Config config = ConfigUtils.propertiesToConfig(properties);
+
+    FlowSpec.Builder flowSpecBuilder = null;
+    flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
+            FLOW_SPEC_GROUP_DIR))
+        .withConfig(config)
+        .withDescription(SPEC_DESCRIPTION)
+        .withVersion(SPEC_VERSION)
+        .withTemplate(URI.create("templateURI"));
+    return flowSpecBuilder.build();
+  }
+
+  private FlowSpec initBadFlowSpec() {
+    // Bad Flow Spec as we don't set the job name,  and will fail the compilation
     Properties properties = new Properties();
     properties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR);
     properties.put("specExecInstance.capabilities", "source:destination");
@@ -245,6 +264,9 @@ public class OrchestratorTest {
     // Make sure FlowCatalog Listener is empty
     Assert.assertTrue(((List)(sei.getProducer().get().listSpecs().get())).size() == 0, "SpecProducer should not know about "
         + "any Flow before addition");
+    // Make sure we cannot add flow to specCatalog it flowSpec cannot compile
+    Assert.expectThrows(Exception.class,() -> this.flowCatalog.put(initBadFlowSpec()));
+    Assert.assertTrue(specs.size() == 0, "Spec store should be empty after adding bad flow spec");
 
     // Create and add Spec
     this.flowCatalog.put(flowSpec);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 0750392ee..a219148bc 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -111,7 +111,7 @@ public class GobblinServiceJobSchedulerTest {
 
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(quotaManager), null);
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(quotaManager), null, false);
 
     SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -197,7 +197,7 @@ public class GobblinServiceJobSchedulerTest {
 
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), null);
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), null, false);
 
     SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -260,7 +260,7 @@ public class GobblinServiceJobSchedulerTest {
     SchedulerService schedulerService = new SchedulerService(new Properties());
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), schedulerService );
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), schedulerService, false);
 
     schedulerService.startAsync().awaitRunning();
     scheduler.startUp();
@@ -328,9 +328,9 @@ public class GobblinServiceJobSchedulerTest {
     when(mockSpecCompiler.compileFlow(flowSpec1)).thenReturn(mockDag1);
 
     SchedulerService schedulerService = new SchedulerService(new Properties());
-    // Mock a GaaS scheduler.
+    // Mock a GaaS scheduler not in warm standby mode
     GobblinServiceJobScheduler scheduler = new GobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new UserQuotaManager(quotaConfig)), Optional.absent());
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new UserQuotaManager(quotaConfig)), Optional.absent(), false);
 
     schedulerService.startAsync().awaitRunning();
     scheduler.startUp();
@@ -345,6 +345,22 @@ public class GobblinServiceJobSchedulerTest {
     // set scheduler to be inactive and unschedule flows
     scheduler.setActive(false);
     Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);
+
+    //Mock a GaaS scheduler in warm standby mode, where we don't check quota
+    GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new GobblinServiceJobScheduler("testscheduler",
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new UserQuotaManager(quotaConfig)), Optional.absent(), true);
+
+    schedulerWithWarmStandbyEnabled.startUp();
+    schedulerWithWarmStandbyEnabled.setActive(true);
+
+    schedulerWithWarmStandbyEnabled.onAddSpec(flowSpec0); //Ignore the response for this request
+    Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 1);
+    schedulerWithWarmStandbyEnabled.onAddSpec(flowSpec1);
+    // Second flow should be added to scheduled flows since no quota check in this case
+    Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 2);
+    // set scheduler to be inactive and unschedule flows
+    schedulerWithWarmStandbyEnabled.setActive(false);
+    Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 0);
   }
 
   class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
@@ -353,8 +369,8 @@ public class GobblinServiceJobSchedulerTest {
 
     public TestGobblinServiceJobScheduler(String serviceName, Config config,
         Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager> quotaManager,
-        SchedulerService schedulerService) throws Exception {
-      super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, quotaManager, Optional.absent());
+        SchedulerService schedulerService, boolean isWarmStandbyEnabled) throws Exception {
+      super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, quotaManager, Optional.absent(), isWarmStandbyEnabled);
       if (schedulerService != null) {
         hasScheduler = true;
       }