You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/02/28 18:12:10 UTC

[gobblin] branch master updated: [GOBBLIN-1791] Prevent the adding of flowspec compilation errors to the scheduler's … (#3650)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new eb809747e [GOBBLIN-1791] Prevent the adding of flowspec compilation errors to the scheduler's … (#3650)
eb809747e is described below

commit eb809747e265ac3b4da846a1d83c4cc850e5ae95
Author: William Lo <lo...@gmail.com>
AuthorDate: Tue Feb 28 10:12:03 2023 -0800

    [GOBBLIN-1791] Prevent the adding of flowspec compilation errors to the scheduler's … (#3650)
    
    * Prevent the adding of flowspec compilation errors to the scheduler's added specs
    
    * add test
---
 .../java/org/apache/gobblin/runtime/api/FlowSpec.java     |  4 ++++
 .../service/modules/flow/MultiHopFlowCompiler.java        | 15 +++++++--------
 .../service/modules/orchestration/Orchestrator.java       |  7 +++----
 .../service/modules/flow/MultiHopFlowCompilerTest.java    |  2 ++
 4 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 85e7a269b..bb9b6de3f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -136,6 +136,10 @@ public class FlowSpec implements Configurable, Spec {
     this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage));
   }
 
+  public void clearCompilationErrors() {
+    this.compilationErrors.clear();
+  }
+
   @EqualsAndHashCode
   public static class CompilationError {
     public int errorPriority;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index d4ee50729..0f5bcf9fd 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -33,11 +33,8 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
 import org.apache.hadoop.fs.Path;
-
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.slf4j.Logger;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
@@ -53,6 +50,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
@@ -60,20 +58,20 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
-import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
 import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
+import org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
+import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
-import org.slf4j.Logger;
 
 
 /***
@@ -286,7 +284,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     }
     Instrumented.markMeter(flowCompilationSuccessFulMeter);
     Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
+    // Clear compilation errors now that compilation is successful
+    flowSpec.clearCompilationErrors();
     return jobExecutionPlanDag;
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7dbc55011..c9a21560c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -27,7 +27,6 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +71,7 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
@@ -248,6 +248,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
       boolean allowConcurrentExecution = ConfigUtils
           .getBoolean(flowConfig, ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
 
+      Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
       if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
         _log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
             + "concurrent executions are disabled for this flow.", flowGroup, flowName);
@@ -255,7 +257,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
         Instrumented.markMeter(this.skippedFlowsMeter);
         if (!((FlowSpec)spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
           // For ad-hoc flow, we might already increase quota, we need to decrease here
-          Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
           for(Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
             quotaManager.releaseQuota(dagNode);
           }
@@ -274,8 +275,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
       Optional<TimingEvent> flowCompilationTimer = this.eventSubmitter.transform(submitter ->
           new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
 
-      Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
-
       Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
       if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
         // For scheduled flows, we do not insert the flowExecutionId into the FlowSpec. As a result, if the flow
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index ab82ec937..4a31a81bd 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -631,6 +631,8 @@ public class MultiHopFlowCompilerTest {
     String jobName2 = jobConfig2.getString(ConfigurationKeys.JOB_NAME_KEY);
     Assert.assertTrue(jobName2.startsWith(retentionJobName));
     Assert.assertTrue(jobConfig2.getString(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY).endsWith("{dataset0,dataset1,dataset2}"));
+    // Should be empty since compilation was successful
+    Assert.assertEquals(spec.getCompilationErrors().size(), 0);
   }
 
   @Test (dependsOnMethods = "testCompileCombinedDatasetFlow")