You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/09/01 16:20:30 UTC
[gobblin] branch master updated: [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (#3544)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4a80b2adc [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (#3544)
4a80b2adc is described below
commit 4a80b2adc90e7f3962f85fec93aec1a51e58c818
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Thu Sep 1 09:20:23 2022 -0700
[GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (#3544)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode
* add orchestor as listener before service start
* fix code style
* address comments
* fix test case to test orchestor as one listener of flow spec
* remove unintentional change
* remove unused import
* address comments
* fix typo
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
.../apache/gobblin/service/ServiceConfigKeys.java | 3 ++
.../service/FlowConfigV2ResourceLocalHandler.java | 2 +-
.../org/apache/gobblin/runtime/api/FlowSpec.java | 9 +++-
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 23 +++++++---
.../gobblin/runtime/util}/InjectionNames.java | 3 +-
.../modules/core/GobblinServiceConfiguration.java | 5 +++
.../modules/core/GobblinServiceGuiceModule.java | 5 ++-
.../modules/core/GobblinServiceManager.java | 5 ++-
.../modules/flow/BaseFlowToJobSpecCompiler.java | 52 +++++++++++++++++++---
.../modules/orchestration/Orchestrator.java | 9 +++-
.../GobblinServiceFlowConfigResourceHandler.java | 2 +-
.../GobblinServiceFlowConfigV2ResourceHandler.java | 2 +-
...GobblinServiceFlowExecutionResourceHandler.java | 2 +-
.../scheduler/GobblinServiceJobScheduler.java | 17 ++++---
.../modules/orchestration/OrchestratorTest.java | 42 ++++++++++++-----
.../scheduler/GobblinServiceJobSchedulerTest.java | 30 ++++++++++---
16 files changed, 166 insertions(+), 45 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index c1536e037..dc149e20b 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -26,6 +26,7 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS = "org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
+ public static final String GOBBLIN_ORCHESTRATOR_LISTENER_CLASS = "org.apache.gobblin.service.modules.orchestration.Orchestrator";
// Gobblin Service Manager Keys
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
@@ -38,6 +39,7 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false;
public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
+ public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
// If true, will mark up/down d2 servers on leadership so that all requests will be routed to the leader node
public static final String GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER = GOBBLIN_SERVICE_PREFIX + "d2.onlyAnnounceLeader";
@@ -57,6 +59,7 @@ public class ServiceConfigKeys {
// Flow Compiler Keys
public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class";
public static final String COMPILATION_SUCCESSFUL = "compilation.successful";
+ public static final String COMPILATION_RESPONSE = "compilation.response";
// Flow Catalog Keys
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = GOBBLIN_SERVICE_PREFIX + "flowCatalog.localCommit";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index d2bb55743..6370192ca 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -95,7 +95,7 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
//This is an Explain request. So no resource is actually created.
//Enrich original FlowConfig entity by adding the compiledFlow to the properties map.
StringMap props = flowConfig.getProperties();
- AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
+ AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_RESPONSE, null);
props.put("gobblin.flow.compiled",
addSpecResponse != null && addSpecResponse.getValue() != null ? StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
flowConfig.setProperties(props);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 5675da1a1..edeea4744 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -133,7 +133,8 @@ public class FlowSpec implements Configurable, Spec {
public static class CompilationError {
public int errorPriority;
public String errorMessage;
- CompilationError(Config config, String src, String dst, String errorMessage) {
+
+ public CompilationError(Config config, String src, String dst, String errorMessage) {
errorPriority = 0;
if (!src.equals(ConfigUtils.getString(config, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))){
errorPriority++;
@@ -144,8 +145,14 @@ public class FlowSpec implements Configurable, Spec {
}
this.errorMessage = errorMessage;
}
+
+ public CompilationError(int errorPriority, String errorMessage) {
+ this.errorPriority = errorPriority;
+ this.errorMessage = errorMessage;
+ }
}
+
public String toShortString() {
return getUri().toString() + "/" + getVersion();
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index b269279eb..6a64ae9bb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -28,7 +28,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import javax.inject.Named;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.runtime.util.InjectionNames;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,6 +87,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
protected final Logger log;
protected final MetricContext metricContext;
protected final MutableStandardMetrics metrics;
+ protected final boolean isWarmStandbyEnabled;
@Getter
protected final SpecStore specStore;
// a map which keeps a handle of condition variables for each spec being added to the flow catalog
@@ -98,17 +101,17 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
public FlowCatalog(Config config, Optional<Logger> log) {
- this(config, log, Optional.<MetricContext>absent(), true);
+ this(config, log, Optional.<MetricContext>absent(), true, false);
}
@Inject
- public FlowCatalog(Config config, GobblinInstanceEnvironment env) {
+ public FlowCatalog(Config config, GobblinInstanceEnvironment env, @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled) {
this(config, Optional.of(env.getLog()), Optional.of(env.getMetricContext()),
- env.isInstrumentationEnabled());
+ env.isInstrumentationEnabled(), isWarmStandbyEnabled);
}
public FlowCatalog(Config config, Optional<Logger> log, Optional<MetricContext> parentMetricContext,
- boolean instrumentationEnabled) {
+ boolean instrumentationEnabled, boolean isWarmStandbyEnabled) {
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.listeners = new SpecCatalogListenersList(log);
if (instrumentationEnabled) {
@@ -121,6 +124,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
this.metricContext = null;
this.metrics = null;
}
+ this.isWarmStandbyEnabled = isWarmStandbyEnabled;
this.aliasResolver = new ClassAliasResolver<>(SpecStore.class);
@@ -383,10 +387,17 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
return responseMap;
}
}
- AddSpecResponse<String> schedulerResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
+ AddSpecResponse<String> compileResponse;
+ if (isWarmStandbyEnabled) {
+ compileResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_ORCHESTRATOR_LISTENER_CLASS, new AddSpecResponse<>(null));
+ //todo: do we check quota here? or in compiler? Quota manager need dag to check quota which is not accessable from this class
+ } else {
+ compileResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
+ }
+ responseMap.put(ServiceConfigKeys.COMPILATION_RESPONSE, compileResponse);
// Check that the flow configuration is valid and matches to a corresponding edge
- if (isCompileSuccessful(schedulerResponse.getValue())) {
+ if (isCompileSuccessful(compileResponse.getValue())) {
synchronized (syncObject) {
try {
if (!flowSpec.isExplain()) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
similarity index 90%
rename from gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java
rename to gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
index ebe6142af..d0e42f525 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.utils;
+package org.apache.gobblin.runtime.util;
/**
* These names are used for dependency injection, when we need to inject different instances of the same type,
@@ -25,4 +25,5 @@ public final class InjectionNames {
public static final String SERVICE_NAME = "serviceName";
public static final String FORCE_LEADER = "forceLeader";
public static final String FLOW_CATALOG_LOCAL_COMMIT = "flowCatalogLocalCommit";
+ public static final String WARM_STANDBY_ENABLED = "statelessRestAPIEnabled";
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
index 25c263a4d..0a3640da5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -40,6 +40,9 @@ public class GobblinServiceConfiguration {
@Getter
private final String serviceId;
+ @Getter
+ private final boolean isWarmStandbyEnabled;
+
@Getter
private final boolean isTopologyCatalogEnabled;
@@ -103,6 +106,8 @@ public class GobblinServiceConfiguration {
isGitConfigMonitorEnabled = false;
}
+ this.isWarmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
+
this.isHelixManagerEnabled = config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
this.isDagManagerEnabled =
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 68170a129..653354d45 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -73,7 +73,7 @@ import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
import org.apache.gobblin.service.modules.utils.HelixUtils;
-import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -136,6 +136,9 @@ public class GobblinServiceGuiceModule implements Module {
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT))
.to(serviceConfig.isFlowCatalogLocalCommit());
+ binder.bindConstant()
+ .annotatedWith(Names.named(InjectionNames.WARM_STANDBY_ENABLED))
+ .to(serviceConfig.isWarmStandbyEnabled());
binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandler.class);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 76f568495..463b19eab 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -394,7 +394,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
registerServicesInLauncher();
// Register Scheduler to listen to changes in Flows
- if (configuration.isSchedulerEnabled()) {
+ // In warm standby mode, instead of scheduler we will add orchestrator as listener
+ if(configuration.isWarmStandbyEnabled()) {
+ this.flowCatalog.addListener(this.orchestrator);
+ } else if (configuration.isSchedulerEnabled()) {
this.flowCatalog.addListener(this.scheduler);
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 244a2fad2..f910e9e4e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
+import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -149,15 +150,52 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
return;
}
- @Override
- public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
- TopologySpec spec = (TopologySpec) addedSpec;
- log.info ("Loading topology {}", spec.toLongString());
- for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
- log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+ private synchronized AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+ log.info("Loading topology {}", spec.toLongString());
+ for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+ log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
+ }
+
+ topologySpecMap.put(spec.getUri(), spec);
+ return new AddSpecResponse(null);
+ }
+
+ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+ Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+ if (topologySpecMap.containsKey(flowSpec.getUri())) {
+ log.error("flow spec URI: {} is the same as one of the spec executors uris, ignore the flow", flowSpec.getUri());
+ flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0, "invalid flow spec uri " + flowSpec.getUri() + " because it is the same as one of the spec executors uri"));
+ return null;
+ }
+ if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils.isNotBlank(
+ flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+ try {
+ new CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ } catch (Exception e) {
+ log.error("invalid cron schedule: {}", flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+ flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0, "invalid cron schedule: " + flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) + e.getMessage()));
+ return null;
+ }
}
+ String response = null;
- topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+ // always try to compile the flow to verify if it is compilable
+ Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+ // If dag is null then a compilation error has occurred
+ if (dag != null && !dag.isEmpty()) {
+ response = dag.toString();
+ }
+ // todo: should we check quota here?
+ return new AddSpecResponse<>(response);
+ }
+
+ @Override
+ public AddSpecResponse onAddSpec(Spec addedSpec) {
+ if (addedSpec instanceof FlowSpec) {
+ return onAddFlowSpec((FlowSpec) addedSpec);
+ } else if (addedSpec instanceof TopologySpec) {
+ return onAddTopologySpec( (TopologySpec) addedSpec);
+ }
return new AddSpecResponse(null);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 46a37e74c..19a1acdc5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -75,7 +75,8 @@ import org.apache.gobblin.util.ConfigUtils;
/**
* Orchestrator that is a {@link SpecCatalogListener}. It listens to changes
- * to {@link TopologyCatalog} and updates {@link SpecCompiler} state.
+ * to {@link TopologyCatalog} and updates {@link SpecCompiler} state
+ * Also it listens to {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} and use the compiler to compile the new flow spec.
*/
@Alpha
@Singleton
@@ -169,6 +170,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
if (addedSpec instanceof TopologySpec) {
_log.info("New Spec detected of type TopologySpec: " + addedSpec);
this.specCompiler.onAddSpec(addedSpec);
+ } else if (addedSpec instanceof FlowSpec) {
+ _log.info("New Spec detected of type FlowSpec: " + addedSpec);
+ return this.specCompiler.onAddSpec(addedSpec);
}
return new AddSpecResponse(null);
}
@@ -191,6 +195,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
@Override
public void onUpdateSpec(Spec updatedSpec) {
_log.info("Spec changed: " + updatedSpec);
+ if (updatedSpec instanceof FlowSpec) {
+ onAddSpec(updatedSpec);
+ }
if (!(updatedSpec instanceof TopologySpec)) {
return;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 44890a21d..8045ae171 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -50,7 +50,7 @@ import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.utils.HelixUtils;
-import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.runtime.util.InjectionNames;
/**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java
index 74ed9a0dd..6389d4d0e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java
@@ -27,7 +27,7 @@ import javax.inject.Named;
import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
import org.apache.gobblin.service.FlowConfigsV2ResourceHandler;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
-import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.runtime.util.InjectionNames;
public class GobblinServiceFlowConfigV2ResourceHandler extends GobblinServiceFlowConfigResourceHandler
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index 15ffc2f42..489eb2b4d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -40,7 +40,7 @@ import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.FlowStatusId;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.utils.HelixUtils;
-import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.monitoring.KillFlowEvent;
import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index d00365f13..c8286740a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -57,7 +57,7 @@ import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -93,6 +93,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
protected final Optional<FlowCatalog> flowCatalog;
protected final Optional<HelixManager> helixManager;
protected final Orchestrator orchestrator;
+ protected final Boolean warmStandbyEnabled;
protected final Optional<UserQuotaManager> quotaManager;
@Getter
protected final Map<String, Spec> scheduledFlowSpecs;
@@ -120,9 +121,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
public static final String DR_FILTER_TAG = "dr";
@Inject
- public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName, Config config,
+ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName,
+ Config config,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
- Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log) throws Exception {
+ Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
+ @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled) throws Exception {
super(ConfigUtils.configToProperties(config), schedulerService);
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -133,15 +136,16 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
this.scheduledFlowSpecs = Maps.newHashMap();
this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
&& config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
+ this.warmStandbyEnabled = warmStandbyEnabled;
this.quotaManager = quotaManager;
}
public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
Optional<HelixManager> helixManager,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<UserQuotaManager> quotaManager,
- SchedulerService schedulerService, Optional<Logger> log) throws Exception {
+ SchedulerService schedulerService, Optional<Logger> log, boolean warmStandbyEnabled) throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
- new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, quotaManager, log);
+ new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, quotaManager, log, warmStandbyEnabled);
}
public synchronized void setActive(boolean isActive) {
@@ -327,7 +331,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
// Check quota limits against run immediately flows or adhoc flows before saving the schedule
- if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+ // In warm standby mode, this quota check will happen on restli API layer when we accept the flow
+ if (!this.warmStandbyEnabled && (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
if (quotaManager.isPresent()) {
// QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
try {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index c25c91b59..b1cec2b3a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -31,7 +31,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
-import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +49,6 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
@@ -59,7 +57,6 @@ import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
@@ -103,14 +100,10 @@ public class OrchestratorTest {
Optional.of(logger));
this.serviceLauncher.addService(topologyCatalog);
+ // Test warm standby flow catalog, which has orchestrator as listener
this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties),
- Optional.of(logger));
-
- this.mockListener = mock(SpecCatalogListener.class);
- when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
- when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ Optional.of(logger), Optional.<MetricContext>absent(), true, true);
- this.flowCatalog.addListener(mockListener);
this.serviceLauncher.addService(flowCatalog);
this.mockStatusGenerator = mock(FlowStatusGenerator.class);
@@ -119,7 +112,6 @@ public class OrchestratorTest {
Optional.of(this.topologyCatalog), Optional.<DagManager>absent(), Optional.of(logger));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
-
// Start application
this.serviceLauncher.start();
// Create Spec to play with
@@ -152,6 +144,33 @@ public class OrchestratorTest {
}
private FlowSpec initFlowSpec() {
+ Properties properties = new Properties();
+ String flowName = "test_flowName";
+ String flowGroup = "test_flowGroup";
+ properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
+ properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
+ properties.put("job.name", flowName);
+ properties.put("job.group", flowGroup);
+ properties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR);
+ properties.put("specExecInstance.capabilities", "source:destination");
+ properties.put("job.schedule", "0 0 0 ? * * 2050");
+ ;
+ properties.put("gobblin.flow.sourceIdentifier", "source");
+ properties.put("gobblin.flow.destinationIdentifier", "destination");
+ Config config = ConfigUtils.propertiesToConfig(properties);
+
+ FlowSpec.Builder flowSpecBuilder = null;
+ flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
+ FLOW_SPEC_GROUP_DIR))
+ .withConfig(config)
+ .withDescription(SPEC_DESCRIPTION)
+ .withVersion(SPEC_VERSION)
+ .withTemplate(URI.create("templateURI"));
+ return flowSpecBuilder.build();
+ }
+
+ private FlowSpec initBadFlowSpec() {
+ // Bad Flow Spec as we don't set the job name, and will fail the compilation
Properties properties = new Properties();
properties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR);
properties.put("specExecInstance.capabilities", "source:destination");
@@ -245,6 +264,9 @@ public class OrchestratorTest {
// Make sure FlowCatalog Listener is empty
Assert.assertTrue(((List)(sei.getProducer().get().listSpecs().get())).size() == 0, "SpecProducer should not know about "
+ "any Flow before addition");
+ // Make sure we cannot add flow to specCatalog it flowSpec cannot compile
+ Assert.expectThrows(Exception.class,() -> this.flowCatalog.put(initBadFlowSpec()));
+ Assert.assertTrue(specs.size() == 0, "Spec store should be empty after adding bad flow spec");
// Create and add Spec
this.flowCatalog.put(flowSpec);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 0750392ee..a219148bc 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -111,7 +111,7 @@ public class GobblinServiceJobSchedulerTest {
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(quotaManager), null);
+ ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(quotaManager), null, false);
SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -197,7 +197,7 @@ public class GobblinServiceJobSchedulerTest {
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), null);
+ ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), null, false);
SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -260,7 +260,7 @@ public class GobblinServiceJobSchedulerTest {
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), schedulerService );
+ ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), schedulerService, false);
schedulerService.startAsync().awaitRunning();
scheduler.startUp();
@@ -328,9 +328,9 @@ public class GobblinServiceJobSchedulerTest {
when(mockSpecCompiler.compileFlow(flowSpec1)).thenReturn(mockDag1);
SchedulerService schedulerService = new SchedulerService(new Properties());
- // Mock a GaaS scheduler.
+ // Mock a GaaS scheduler not in warm standby mode
GobblinServiceJobScheduler scheduler = new GobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new UserQuotaManager(quotaConfig)), Optional.absent());
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new UserQuotaManager(quotaConfig)), Optional.absent(), false);
schedulerService.startAsync().awaitRunning();
scheduler.startUp();
@@ -345,6 +345,22 @@ public class GobblinServiceJobSchedulerTest {
// set scheduler to be inactive and unschedule flows
scheduler.setActive(false);
Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);
+
+ //Mock a GaaS scheduler in warm standby mode, where we don't check quota
+ GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new GobblinServiceJobScheduler("testscheduler",
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new UserQuotaManager(quotaConfig)), Optional.absent(), true);
+
+ schedulerWithWarmStandbyEnabled.startUp();
+ schedulerWithWarmStandbyEnabled.setActive(true);
+
+ schedulerWithWarmStandbyEnabled.onAddSpec(flowSpec0); //Ignore the response for this request
+ Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 1);
+ schedulerWithWarmStandbyEnabled.onAddSpec(flowSpec1);
+ // Second flow should be added to scheduled flows since no quota check in this case
+ Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 2);
+ // set scheduler to be inactive and unschedule flows
+ schedulerWithWarmStandbyEnabled.setActive(false);
+ Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 0);
}
class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
@@ -353,8 +369,8 @@ public class GobblinServiceJobSchedulerTest {
public TestGobblinServiceJobScheduler(String serviceName, Config config,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager> quotaManager,
- SchedulerService schedulerService) throws Exception {
- super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, quotaManager, Optional.absent());
+ SchedulerService schedulerService, boolean isWarmStandbyEnabled) throws Exception {
+ super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, quotaManager, Optional.absent(), isWarmStandbyEnabled);
if (schedulerService != null) {
hasScheduler = true;
}