You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/02/28 18:12:10 UTC
[gobblin] branch master updated: [GOBBLIN-1791] Prevent the adding of flowspec compilation errors to the scheduler's … (#3650)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new eb809747e [GOBBLIN-1791] Prevent the adding of flowspec compilation errors to the scheduler's … (#3650)
eb809747e is described below
commit eb809747e265ac3b4da846a1d83c4cc850e5ae95
Author: William Lo <lo...@gmail.com>
AuthorDate: Tue Feb 28 10:12:03 2023 -0800
[GOBBLIN-1791] Prevent the adding of flowspec compilation errors to the scheduler's … (#3650)
* Prevent the adding of flowspec compilation errors to the scheduler's added specs
* add test
---
.../java/org/apache/gobblin/runtime/api/FlowSpec.java | 4 ++++
.../service/modules/flow/MultiHopFlowCompiler.java | 15 +++++++--------
.../service/modules/orchestration/Orchestrator.java | 7 +++----
.../service/modules/flow/MultiHopFlowCompilerTest.java | 2 ++
4 files changed, 16 insertions(+), 12 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 85e7a269b..bb9b6de3f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -136,6 +136,10 @@ public class FlowSpec implements Configurable, Spec {
this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage));
}
+ public void clearCompilationErrors() {
+ this.compilationErrors.clear();
+ }
+
@EqualsAndHashCode
public static class CompilationError {
public int errorPriority;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index d4ee50729..0f5bcf9fd 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -33,11 +33,8 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
import org.apache.hadoop.fs.Path;
-
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.slf4j.Logger;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@@ -53,6 +50,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
@@ -60,20 +58,20 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
-import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
+import org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
+import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
-import org.slf4j.Logger;
/***
@@ -286,7 +284,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
}
Instrumented.markMeter(flowCompilationSuccessFulMeter);
Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
+ // Clear compilation errors now that compilation is successful
+ flowSpec.clearCompilationErrors();
return jobExecutionPlanDag;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7dbc55011..c9a21560c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -27,7 +27,6 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +71,7 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
@@ -248,6 +248,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
boolean allowConcurrentExecution = ConfigUtils
.getBoolean(flowConfig, ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
_log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup, flowName);
@@ -255,7 +257,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
Instrumented.markMeter(this.skippedFlowsMeter);
if (!((FlowSpec)spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
// For ad-hoc flow, we might already increase quota, we need to decrease here
- Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
for(Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
quotaManager.releaseQuota(dagNode);
}
@@ -274,8 +275,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
Optional<TimingEvent> flowCompilationTimer = this.eventSubmitter.transform(submitter ->
new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
- Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
-
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
// For scheduled flows, we do not insert the flowExecutionId into the FlowSpec. As a result, if the flow
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index ab82ec937..4a31a81bd 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -631,6 +631,8 @@ public class MultiHopFlowCompilerTest {
String jobName2 = jobConfig2.getString(ConfigurationKeys.JOB_NAME_KEY);
Assert.assertTrue(jobName2.startsWith(retentionJobName));
Assert.assertTrue(jobConfig2.getString(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY).endsWith("{dataset0,dataset1,dataset2}"));
+ // Should be empty since compilation was successful
+ Assert.assertEquals(spec.getCompilationErrors().size(), 0);
}
@Test (dependsOnMethods = "testCompileCombinedDatasetFlow")