You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/01/04 05:12:27 UTC

incubator-gobblin git commit: [GOBBLIN-659] Ensure MultiHopFlowCompiler is properly initialized before attempting flow orchestration.[]

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 43c561c95 -> f861dca32


[GOBBLIN-659] Ensure MultiHopFlowCompiler is properly initialized before attempting flow orchestration.[]

Closes #2529 from sv2000/gaasStartUp


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f861dca3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f861dca3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f861dca3

Branch: refs/heads/master
Commit: f861dca3212e874d64433f573d73927a025e44a5
Parents: 43c561c
Author: suvasude <su...@linkedin.biz>
Authored: Thu Jan 3 21:12:22 2019 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Jan 3 21:12:22 2019 -0800

----------------------------------------------------------------------
 .../modules/core/GitFlowGraphMonitor.java       | 11 ++++++++--
 .../modules/core/GobblinServiceManager.java     |  4 ++++
 .../modules/flow/BaseFlowToJobSpecCompiler.java |  9 ++++++++
 .../modules/flow/MultiHopFlowCompiler.java      | 23 +++++++++++++++++---
 .../service/modules/flow/SpecCompiler.java      | 19 +++++++++++++++-
 .../modules/orchestration/Orchestrator.java     |  3 +++
 .../modules/core/GitFlowGraphMonitorTest.java   |  3 ++-
 .../modules/flow/MultiHopFlowCompilerTest.java  |  2 ++
 8 files changed, 67 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 54544a1..a0b53fe 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
@@ -90,12 +91,14 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
   private FlowGraph flowGraph;
   private final Map<URI, TopologySpec> topologySpecMap;
   private final Config emptyConfig = ConfigFactory.empty();
+  private final CountDownLatch initComplete;
 
-  public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap) {
+  public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
     super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
     this.flowCatalog = flowCatalog;
     this.flowGraph = graph;
     this.topologySpecMap = topologySpecMap;
+    this.initComplete = initComplete;
   }
 
   /**
@@ -104,7 +107,7 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
    */
   @Override
   public boolean shouldPollGit() {
-    return true;
+    return this.isActive;
   }
 
   /**
@@ -129,7 +132,11 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
       return o1Depth.compareTo(o2Depth);
     });
     processGitConfigChangesHelper(changes);
+    //Decrements the latch count. The countdown latch is initialized to 1. So after the first time the latch is decremented,
+    // the following operation should be a no-op.
+    this.initComplete.countDown();
   }
+
   /**
    * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
    * the {@link FlowGraph} for an added, updated or modified node or edge file.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 4091d21..a77833e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -87,6 +87,7 @@ import org.apache.gobblin.service.NoopRequesterService;
 import org.apache.gobblin.service.RequesterService;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
@@ -447,6 +448,9 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
 
     // Notify now topologyCatalog has the right information
     this.topologyCatalog.getInitComplete().countDown();
+
+    //Activate the SpecCompiler, after the topologyCatalog has been initialized.
+    this.orchestrator.getSpecCompiler().setActive(true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index adef5cc..0f51185 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -95,6 +95,9 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
   protected Optional<Meter> flowCompilationFailedMeter;
   @Getter
   protected Optional<Timer> flowCompilationTimer;
+  @Getter
+  @Setter
+  protected boolean active;
 
   public BaseFlowToJobSpecCompiler(Config config){
     this(config,true);
@@ -150,6 +153,12 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
   }
 
   @Override
+  public void awaitHealthy() throws InterruptedException {
+    //Do nothing
+    return;
+  }
+
+  @Override
   public synchronized void onAddSpec(Spec addedSpec) {
     TopologySpec spec = (TopologySpec) addedSpec;
     log.info ("Loading topology {}", spec.toLongString());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index c1b3e84..01ed778 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.flow;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -67,7 +68,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
   @Getter
   private ServiceManager serviceManager;
   @Getter
-  private boolean active;
+  private CountDownLatch initComplete = new CountDownLatch(1);
 
   private GitFlowGraphMonitor gitFlowGraphMonitor;
 
@@ -94,7 +95,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
       throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
     }
     this.flowGraph = new BaseFlowGraph();
-    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph, this.topologySpecMap);
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph, this.topologySpecMap, this.getInitComplete());
     this.serviceManager = new ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor));
     addShutdownHook();
     //Start the git flow graph monitor
@@ -112,11 +113,27 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     this.flowGraph = flowGraph;
   }
 
+  /**
+   * Mark the {@link SpecCompiler} as active. This in turn activates the {@link GitFlowGraphMonitor}, allowing to start polling
+   * and processing changes
+   * @param active
+   */
+  @Override
   public void setActive(boolean active) {
-    this.active = active;
+    super.setActive(active);
     this.gitFlowGraphMonitor.setActive(active);
   }
 
+  @Override
+  public void awaitHealthy() throws InterruptedException {
+    if (this.getInitComplete().getCount() > 0) {
+      log.info("Waiting for the MultiHopFlowCompiler to become healthy..");
+      this.getInitComplete().await();
+      log.info("The MultihopFlowCompiler is healthy and ready to orchestrate flows.");
+    }
+    return;
+  }
+
   /**
    * j
    * @param spec an instance of {@link FlowSpec}.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
index 53cdf83..ae17e7e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
@@ -38,7 +38,7 @@ public interface SpecCompiler extends SpecCatalogListener, Instrumentable {
    * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
    * and the mapping to {@link SpecExecutor} that they can be run on.
    * All the specs generated from the compileFlow must have a
-   * {@link org.apache.gobblin.configuration.ConfigurationKeys.FLOW_EXECUTION_ID_KEY}
+   * {@value org.apache.gobblin.configuration.ConfigurationKeys#FLOW_EXECUTION_ID_KEY}
    * @param spec {@link Spec} to compile.
    * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}.
    */
@@ -50,4 +50,21 @@ public interface SpecCompiler extends SpecCatalogListener, Instrumentable {
    * @return Map of {@link Spec} URI and {@link TopologySpec}
    */
   Map<URI, TopologySpec> getTopologySpecMap();
+
+  /**
+   * Mark the {@link SpecCompiler} active/inactive. Useful to trigger the initialization of {@link SpecCompiler}, if
+   * necessary, before it can start compiling {@link org.apache.gobblin.runtime.api.FlowSpec}s.
+   * @param active
+   */
+  void setActive(boolean active);
+
+  /**
+   * Waits for the {@link SpecCompiler} to become healthy. A {@link SpecCompiler} is healthy when all the component
+   * services it depends on have been successfully initialized. For instance, the {@link MultiHopFlowCompiler} is healthy
+   * when the {@link org.apache.gobblin.service.modules.flowgraph.DataNode}s and {@link org.apache.gobblin.service.modules.flowgraph.FlowEdge}s
+   * can be added to the {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph}. The {@link org.apache.gobblin.service.modules.flowgraph.FlowEdge}
+   * instantiation in turn depends on the successful initialization of {@link org.apache.gobblin.runtime.spec_catalog.TopologyCatalog}, which
+   * instantiates all the configured {@link SpecExecutor}s.
+   */
+  public void awaitHealthy() throws InterruptedException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 3dd7538..fbc38e6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -197,6 +197,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
     // Add below waiting because TopologyCatalog and FlowCatalog service can be launched at the same time
     this.topologyCatalog.get().getInitComplete().await();
 
+    //Wait for the SpecCompiler to become healthy.
+    this.getSpecCompiler().awaitHealthy();
+
     long startTime = System.nanoTime();
     if (spec instanceof FlowSpec) {
       TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index ab8af70..13a452a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.io.FileUtils;
@@ -122,7 +123,7 @@ public class GitFlowGraphMonitorTest {
     //Create a FlowGraph instance with defaults
     this.flowGraph = new BaseFlowGraph();
 
-    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap);
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap, new CountDownLatch(1));
     this.gitFlowGraphMonitor.setActive(true);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 6b51a77..5877e66 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -618,6 +618,8 @@ public class MultiHopFlowCompilerTest {
     //Create a MultiHopFlowCompiler instance
     specCompiler = new MultiHopFlowCompiler(config, Optional.absent(), false);
 
+    specCompiler.setActive(true);
+
     //Ensure node1 is not present in the graph
     Assert.assertNull(specCompiler.getFlowGraph().getNode("node1"));