You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/10/18 20:53:26 UTC

[gobblin] branch master updated: [GOBBLIN-1725] Fix bugs in gaas warm standby mode (#3582)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0936016be [GOBBLIN-1725] Fix bugs in gaas warm standby mode (#3582)
0936016be is described below

commit 0936016be7e3b2a098fb8666fbb6829ea6083e46
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Tue Oct 18 13:53:19 2022 -0700

    [GOBBLIN-1725] Fix bugs in gaas warm standby mode (#3582)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1725] Fix bugs in gaas warm standby mode
    
    * fix missing config
    
    * address comments
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
 .../service/modules/flow/BaseFlowToJobSpecCompiler.java   | 13 +++++++------
 .../gobblin/service/modules/orchestration/DagManager.java | 15 +++++++++------
 .../gobblin/service/monitoring/GitFlowGraphMonitor.java   |  2 +-
 .../service/modules/orchestration/DagManagerFlowTest.java |  9 +++------
 4 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 976d12650..d0e997771 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 
 import javax.inject.Inject;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.quartz.CronExpression;
 import org.slf4j.Logger;
@@ -191,7 +192,12 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
     // always try to compile the flow to verify if it is compilable
     Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
 
-    if (this.warmStandbyEnabled &&
+    // If dag is null then a compilation error has occurred
+    if (dag != null && !dag.isEmpty()) {
+      response = dag.toString();
+    }
+
+    if (FlowCatalog.isCompileSuccessful(response) && this.warmStandbyEnabled && !flowSpec.isExplain() &&
         (!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
       try {
         userQuotaManager.checkQuota(dag.getStartNodes());
@@ -201,11 +207,6 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
       }
     }
 
-    // If dag is null then a compilation error has occurred
-    if (dag != null && !dag.isEmpty()) {
-      response = dag.toString();
-    }
-    // todo: should we check quota here?
     return new AddSpecResponse<>(response);
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 4391a5f9b..0bcceaf62 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -200,11 +200,13 @@ public class DagManager extends AbstractIdleService {
   private final Optional<EventSubmitter> eventSubmitter;
   private final long failedDagRetentionTime;
   private final DagManagerMetrics dagManagerMetrics;
-  private final Optional<DagActionStore> dagActionStore;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
 
   private volatile boolean isActive = false;
 
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Optional<DagActionStore> dagActionStore, boolean instrumentationEnabled) {
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean instrumentationEnabled) {
     this.config = config;
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
     this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[]) initializeDagQueue(this.numThreads);
@@ -221,7 +223,6 @@ public class DagManager extends AbstractIdleService {
     } else {
       this.eventSubmitter = Optional.absent();
     }
-    this.dagActionStore = dagActionStore;
     this.dagManagerMetrics = new DagManagerMetrics(metricContext);
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
@@ -249,9 +250,9 @@ public class DagManager extends AbstractIdleService {
     return queue;
   }
 
-  @Inject(optional = true)
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, Optional<DagActionStore> dagActionStore) {
-    this(config, jobStatusRetriever, dagActionStore, true);
+  @Inject
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever) {
+    this(config, jobStatusRetriever, true);
   }
 
   /** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s and loading of any {@link Dag}s is done
@@ -665,6 +666,8 @@ public class DagManager extends AbstractIdleService {
         props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
         sendCancellationEvent(dagNodeToCancel.getValue());
       }
+      props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+          ConfigUtils.getString(dagNodeToCancel.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ""));
       DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), props);
     }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
index 684ab2c90..68174fcd6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
@@ -66,9 +66,9 @@ public class GitFlowGraphMonitor extends GitMonitoringService implements FlowGra
       .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
       .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
       .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
-      .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
       .put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS, ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
       .put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS, ConfigurationKeys.DEFAULT_CONF_EXTENSIONS)
+      .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
       .put(SHOULD_CHECKPOINT_HASHES, false).build());
 
   private final Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 71ddd9333..59aeda41f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -88,7 +88,8 @@ public class DagManagerFlowTest {
     dagActionStore = new MysqlDagActionStore(config);
     dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.KILL);
     dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.DagActionValue.RESUME);
-    dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), Optional.of(dagActionStore), false);
+    dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), false);
+    dagManager.dagActionStore = Optional.of(dagActionStore);
     dagManager.setActive(true);
     this.dagNumThreads = dagManager.getNumThreads();
     Thread.sleep(10000);
@@ -325,11 +326,7 @@ class CancelPredicate implements Predicate<Void> {
 class MockedDagManager extends DagManager {
 
   public MockedDagManager(Config config, boolean instrumentationEnabled) {
-    super(config, createJobStatusRetriever(), Optional.absent(), instrumentationEnabled);
-  }
-
-  public MockedDagManager(Config config, Optional<DagActionStore> dagactionStore, boolean instrumentationEnabled) {
-    super(config, createJobStatusRetriever(), dagactionStore, instrumentationEnabled);
+    super(config, createJobStatusRetriever(), instrumentationEnabled);
   }
 
   private static JobStatusRetriever createJobStatusRetriever() {