You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/10/18 20:53:26 UTC
[gobblin] branch master updated: [GOBBLIN-1725] Fix bugs in gaas warm standby mode (#3582)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0936016be [GOBBLIN-1725] Fix bugs in gaas warm standby mode (#3582)
0936016be is described below
commit 0936016be7e3b2a098fb8666fbb6829ea6083e46
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Tue Oct 18 13:53:19 2022 -0700
[GOBBLIN-1725] Fix bugs in gaas warm standby mode (#3582)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1725] Fix bugs in gaas warm standby mode
* fix missing config
* address comments
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
.../service/modules/flow/BaseFlowToJobSpecCompiler.java | 13 +++++++------
.../gobblin/service/modules/orchestration/DagManager.java | 15 +++++++++------
.../gobblin/service/monitoring/GitFlowGraphMonitor.java | 2 +-
.../service/modules/orchestration/DagManagerFlowTest.java | 9 +++------
4 files changed, 20 insertions(+), 19 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 976d12650..d0e997771 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -26,6 +26,7 @@ import java.util.Properties;
import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.quartz.CronExpression;
import org.slf4j.Logger;
@@ -191,7 +192,12 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
- if (this.warmStandbyEnabled &&
+ // If dag is null then a compilation error has occurred
+ if (dag != null && !dag.isEmpty()) {
+ response = dag.toString();
+ }
+
+ if (FlowCatalog.isCompileSuccessful(response) && this.warmStandbyEnabled && !flowSpec.isExplain() &&
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
try {
userQuotaManager.checkQuota(dag.getStartNodes());
@@ -201,11 +207,6 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
}
}
- // If dag is null then a compilation error has occurred
- if (dag != null && !dag.isEmpty()) {
- response = dag.toString();
- }
- // todo: should we check quota here?
return new AddSpecResponse<>(response);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 4391a5f9b..0bcceaf62 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -200,11 +200,13 @@ public class DagManager extends AbstractIdleService {
private final Optional<EventSubmitter> eventSubmitter;
private final long failedDagRetentionTime;
private final DagManagerMetrics dagManagerMetrics;
- private final Optional<DagActionStore> dagActionStore;
+
+ @Inject(optional=true)
+ protected Optional<DagActionStore> dagActionStore;
private volatile boolean isActive = false;
- public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Optional<DagActionStore> dagActionStore, boolean instrumentationEnabled) {
+ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean instrumentationEnabled) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[]) initializeDagQueue(this.numThreads);
@@ -221,7 +223,6 @@ public class DagManager extends AbstractIdleService {
} else {
this.eventSubmitter = Optional.absent();
}
- this.dagActionStore = dagActionStore;
this.dagManagerMetrics = new DagManagerMetrics(metricContext);
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
@@ -249,9 +250,9 @@ public class DagManager extends AbstractIdleService {
return queue;
}
- @Inject(optional = true)
- public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Optional<DagActionStore> dagActionStore) {
- this(config, jobStatusRetriever, dagActionStore, true);
+ @Inject
+ public DagManager(Config config, JobStatusRetriever jobStatusRetriever) {
+ this(config, jobStatusRetriever, true);
}
/** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s and loading of any {@link Dag}s is done
@@ -665,6 +666,8 @@ public class DagManager extends AbstractIdleService {
props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
sendCancellationEvent(dagNodeToCancel.getValue());
}
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+ ConfigUtils.getString(dagNodeToCancel.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ""));
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), props);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
index 684ab2c90..68174fcd6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
@@ -66,9 +66,9 @@ public class GitFlowGraphMonitor extends GitMonitoringService implements FlowGra
.put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
.put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
.put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
- .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
.put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS, ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
.put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS, ConfigurationKeys.DEFAULT_CONF_EXTENSIONS)
+ .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
.put(SHOULD_CHECKPOINT_HASHES, false).build());
private final Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 71ddd9333..59aeda41f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -88,7 +88,8 @@ public class DagManagerFlowTest {
dagActionStore = new MysqlDagActionStore(config);
dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.KILL);
dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.DagActionValue.RESUME);
- dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), Optional.of(dagActionStore), false);
+ dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), false);
+ dagManager.dagActionStore = Optional.of(dagActionStore);
dagManager.setActive(true);
this.dagNumThreads = dagManager.getNumThreads();
Thread.sleep(10000);
@@ -325,11 +326,7 @@ class CancelPredicate implements Predicate<Void> {
class MockedDagManager extends DagManager {
public MockedDagManager(Config config, boolean instrumentationEnabled) {
- super(config, createJobStatusRetriever(), Optional.absent(), instrumentationEnabled);
- }
-
- public MockedDagManager(Config config, Optional<DagActionStore> dagactionStore, boolean instrumentationEnabled) {
- super(config, createJobStatusRetriever(), dagactionStore, instrumentationEnabled);
+ super(config, createJobStatusRetriever(), instrumentationEnabled);
}
private static JobStatusRetriever createJobStatusRetriever() {