You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/01/04 05:12:27 UTC
incubator-gobblin git commit: [GOBBLIN-659] Ensure
MultiHopFlowCompiler is properly initialized before attempting flow
orchestration.[]
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 43c561c95 -> f861dca32
[GOBBLIN-659] Ensure MultiHopFlowCompiler is properly initialized before attempting flow orchestration.[]
Closes #2529 from sv2000/gaasStartUp
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f861dca3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f861dca3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f861dca3
Branch: refs/heads/master
Commit: f861dca3212e874d64433f573d73927a025e44a5
Parents: 43c561c
Author: suvasude <su...@linkedin.biz>
Authored: Thu Jan 3 21:12:22 2019 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Jan 3 21:12:22 2019 -0800
----------------------------------------------------------------------
.../modules/core/GitFlowGraphMonitor.java | 11 ++++++++--
.../modules/core/GobblinServiceManager.java | 4 ++++
.../modules/flow/BaseFlowToJobSpecCompiler.java | 9 ++++++++
.../modules/flow/MultiHopFlowCompiler.java | 23 +++++++++++++++++---
.../service/modules/flow/SpecCompiler.java | 19 +++++++++++++++-
.../modules/orchestration/Orchestrator.java | 3 +++
.../modules/core/GitFlowGraphMonitorTest.java | 3 ++-
.../modules/flow/MultiHopFlowCompilerTest.java | 2 ++
8 files changed, 67 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 54544a1..a0b53fe 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
@@ -90,12 +91,14 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
private FlowGraph flowGraph;
private final Map<URI, TopologySpec> topologySpecMap;
private final Config emptyConfig = ConfigFactory.empty();
+ private final CountDownLatch initComplete;
- public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap) {
+ public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
this.flowCatalog = flowCatalog;
this.flowGraph = graph;
this.topologySpecMap = topologySpecMap;
+ this.initComplete = initComplete;
}
/**
@@ -104,7 +107,7 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
*/
@Override
public boolean shouldPollGit() {
- return true;
+ return this.isActive;
}
/**
@@ -129,7 +132,11 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
return o1Depth.compareTo(o2Depth);
});
processGitConfigChangesHelper(changes);
+ //Decrements the latch count. The countdown latch is initialized to 1. So after the first time the latch is decremented,
+ // the following operation should be a no-op.
+ this.initComplete.countDown();
}
+
/**
* Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
* the {@link FlowGraph} for an added, updated or modified node or edge file.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 4091d21..a77833e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -87,6 +87,7 @@ import org.apache.gobblin.service.NoopRequesterService;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
@@ -447,6 +448,9 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
// Notify now topologyCatalog has the right information
this.topologyCatalog.getInitComplete().countDown();
+
+ //Activate the SpecCompiler, after the topologyCatalog has been initialized.
+ this.orchestrator.getSpecCompiler().setActive(true);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index adef5cc..0f51185 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -95,6 +95,9 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
protected Optional<Meter> flowCompilationFailedMeter;
@Getter
protected Optional<Timer> flowCompilationTimer;
+ @Getter
+ @Setter
+ protected boolean active;
public BaseFlowToJobSpecCompiler(Config config){
this(config,true);
@@ -150,6 +153,12 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
}
@Override
+ public void awaitHealthy() throws InterruptedException {
+ //Do nothing
+ return;
+ }
+
+ @Override
public synchronized void onAddSpec(Spec addedSpec) {
TopologySpec spec = (TopologySpec) addedSpec;
log.info ("Loading topology {}", spec.toLongString());
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index c1b3e84..01ed778 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.flow;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -67,7 +68,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
@Getter
private ServiceManager serviceManager;
@Getter
- private boolean active;
+ private CountDownLatch initComplete = new CountDownLatch(1);
private GitFlowGraphMonitor gitFlowGraphMonitor;
@@ -94,7 +95,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
}
this.flowGraph = new BaseFlowGraph();
- this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph, this.topologySpecMap);
+ this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph, this.topologySpecMap, this.getInitComplete());
this.serviceManager = new ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor));
addShutdownHook();
//Start the git flow graph monitor
@@ -112,11 +113,27 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
this.flowGraph = flowGraph;
}
+ /**
+ * Mark the {@link SpecCompiler} as active. This in turn activates the {@link GitFlowGraphMonitor}, allowing to start polling
+ * and processing changes
+ * @param active
+ */
+ @Override
public void setActive(boolean active) {
- this.active = active;
+ super.setActive(active);
this.gitFlowGraphMonitor.setActive(active);
}
+ @Override
+ public void awaitHealthy() throws InterruptedException {
+ if (this.getInitComplete().getCount() > 0) {
+ log.info("Waiting for the MultiHopFlowCompiler to become healthy..");
+ this.getInitComplete().await();
+ log.info("The MultihopFlowCompiler is healthy and ready to orchestrate flows.");
+ }
+ return;
+ }
+
/**
* j
* @param spec an instance of {@link FlowSpec}.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
index 53cdf83..ae17e7e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
@@ -38,7 +38,7 @@ public interface SpecCompiler extends SpecCatalogListener, Instrumentable {
* Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
* and the mapping to {@link SpecExecutor} that they can be run on.
* All the specs generated from the compileFlow must have a
- * {@link org.apache.gobblin.configuration.ConfigurationKeys.FLOW_EXECUTION_ID_KEY}
+ * {@value org.apache.gobblin.configuration.ConfigurationKeys#FLOW_EXECUTION_ID_KEY}
* @param spec {@link Spec} to compile.
* @return Map of materialized physical {@link Spec} and {@link SpecExecutor}.
*/
@@ -50,4 +50,21 @@ public interface SpecCompiler extends SpecCatalogListener, Instrumentable {
* @return Map of {@link Spec} URI and {@link TopologySpec}
*/
Map<URI, TopologySpec> getTopologySpecMap();
+
+ /**
+ * Mark the {@link SpecCompiler} active/inactive. Useful to trigger the initialization of {@link SpecCompiler}, if
+ * necessary, before it can start compiling {@link org.apache.gobblin.runtime.api.FlowSpec}s.
+ * @param active
+ */
+ void setActive(boolean active);
+
+ /**
+ * Waits for the {@link SpecCompiler} to become healthy. A {@link SpecCompiler} is healthy when all the component
+ * services it depends on have been successfully initialized. For instance, the {@link MultiHopFlowCompiler} is healthy
+ * when the {@link org.apache.gobblin.service.modules.flowgraph.DataNode}s and {@link org.apache.gobblin.service.modules.flowgraph.FlowEdge}s
+ * can be added to the {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph}. The {@link org.apache.gobblin.service.modules.flowgraph.FlowEdge}
+ * instantiation in turn depends on the successful initialization of {@link org.apache.gobblin.runtime.spec_catalog.TopologyCatalog}, which
+ * instantiates all the configured {@link SpecExecutor}s.
+ */
+ public void awaitHealthy() throws InterruptedException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 3dd7538..fbc38e6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -197,6 +197,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
// Add below waiting because TopologyCatalog and FlowCatalog service can be launched at the same time
this.topologyCatalog.get().getInitComplete().await();
+ //Wait for the SpecCompiler to become healthy.
+ this.getSpecCompiler().awaitHealthy();
+
long startTime = System.nanoTime();
if (spec instanceof FlowSpec) {
TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index ab8af70..13a452a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FileUtils;
@@ -122,7 +123,7 @@ public class GitFlowGraphMonitorTest {
//Create a FlowGraph instance with defaults
this.flowGraph = new BaseFlowGraph();
- this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap);
+ this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap, new CountDownLatch(1));
this.gitFlowGraphMonitor.setActive(true);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 6b51a77..5877e66 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -618,6 +618,8 @@ public class MultiHopFlowCompilerTest {
//Create a MultiHopFlowCompiler instance
specCompiler = new MultiHopFlowCompiler(config, Optional.absent(), false);
+ specCompiler.setActive(true);
+
//Ensure node1 is not present in the graph
Assert.assertNull(specCompiler.getFlowGraph().getNode("node1"));