You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/05 20:35:17 UTC

incubator-gobblin git commit: [GOBBLIN-276] Change setActive order to prevent flow spec loss

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 312e768f5 -> bd17f1384


[GOBBLIN-276] Change setActive order to prevent flow spec loss

Closes #2129 from yukuai518/topologyOrder


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/bd17f138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/bd17f138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/bd17f138

Branch: refs/heads/master
Commit: bd17f1384736a70a26a0808ca406608436373812
Parents: 312e768
Author: Kuai Yu <ku...@linkedin.com>
Authored: Thu Oct 5 13:35:11 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Oct 5 13:35:11 2017 -0700

----------------------------------------------------------------------
 .../runtime/spec_catalog/TopologyCatalog.java      |  5 +++++
 .../modules/core/GobblinServiceManager.java        | 15 ++++++++++++++-
 .../modules/flow/BaseFlowToJobSpecCompiler.java    |  6 ++++++
 .../modules/orchestration/Orchestrator.java        |  5 +++--
 .../scheduler/GobblinServiceJobScheduler.java      | 17 +++++++++++------
 5 files changed, 39 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 2122014..c6e02d2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -23,7 +23,10 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
 import javax.annotation.Nonnull;
+import lombok.Getter;
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
@@ -64,6 +67,8 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
   protected final MetricContext metricContext;
   protected final TopologyCatalog.StandardMetrics metrics;
   protected final SpecStore specStore;
+  @Getter
+  protected CountDownLatch initComplete = new CountDownLatch(1);
 
   private final ClassAliasResolver<SpecStore> aliasResolver;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 598371d..a13ed28 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -223,7 +223,6 @@ public class GobblinServiceManager implements ApplicationLauncher {
     // Register Scheduler to listen to changes in Flows
     if (isSchedulerEnabled) {
       this.flowCatalog.addListener(this.scheduler);
-      this.topologyCatalog.addListener(this.orchestrator);
     }
 
     // Initialize TopologySpecFactory
@@ -356,12 +355,22 @@ public class GobblinServiceManager implements ApplicationLauncher {
     }
 
     // Populate TopologyCatalog with all Topologies generated by TopologySpecFactory
+    // This has to be done after the topologyCatalog service is launched
     if (this.isTopologySpecFactoryEnabled) {
       Collection<TopologySpec> topologySpecs = this.topologySpecFactory.getTopologies();
       for (TopologySpec topologySpec : topologySpecs) {
         this.topologyCatalog.put(topologySpec);
       }
     }
+
+    // Register Orchestrator to listen to changes in topology
+    // This has to be done after topologySpecFactory has updated spec store, so that listeners will have the latest updates.
+    if (isSchedulerEnabled) {
+      this.topologyCatalog.addListener(this.orchestrator);
+    }
+
+    // Notify now topologyCatalog has the right information
+    this.topologyCatalog.getInitComplete().countDown();
   }
 
   @Override
@@ -479,6 +488,7 @@ public class GobblinServiceManager implements ApplicationLauncher {
       public HelixTaskResult handleMessage() throws InterruptedException {
         if (jobScheduler.isActive()) {
           String flowSpecUri = _message.getAttribute(Message.Attributes.INNER_MESSAGE);
+          LOGGER.info ("ControllerUserDefinedMessage received : {}, type {}", flowSpecUri, _message.getMsgSubType());
           try {
             if (_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_ADD)) {
               Spec spec = flowCatalog.getSpec(new URI(flowSpecUri));
@@ -493,6 +503,9 @@ public class GobblinServiceManager implements ApplicationLauncher {
           } catch (SpecNotFoundException | URISyntaxException e) {
             LOGGER.error("Cannot process Helix message for flowSpecUri: " + flowSpecUri, e);
           }
+        } else {
+          String flowSpecUri = _message.getAttribute(Message.Attributes.INNER_MESSAGE);
+          LOGGER.info ("ControllerUserDefinedMessage received but ignored due to not in active mode: {}, type {}", flowSpecUri, _message.getMsgSubType());
         }
         HelixTaskResult helixTaskResult = new HelixTaskResult();
         helixTaskResult.setSuccess(true);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 26f4463..db92ef9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -149,6 +149,12 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
 
   @Override
   public synchronized void onAddSpec(Spec addedSpec) {
+    TopologySpec spec = (TopologySpec) addedSpec;
+    log.info ("Loading topology {}", spec.toLongString());
+    for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
+      log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+    }
+
     topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 261ce6e..286911b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -138,8 +138,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   /** {@inheritDoc} */
   @Override
   public void onAddSpec(Spec addedSpec) {
-    _log.info("New Spec detected: " + addedSpec);
-
     if (addedSpec instanceof TopologySpec) {
       _log.info("New Spec detected of type TopologySpec: " + addedSpec);
       this.specCompiler.onAddSpec(addedSpec);
@@ -178,6 +176,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   }
 
   public void orchestrate(Spec spec) throws Exception {
+    // Add below waiting because TopologyCatalog and FlowCatalog service can be launched at the same time
+    this.topologyCatalog.get().getInitComplete().await();
+
     long startTime = System.nanoTime();
     if (spec instanceof FlowSpec) {
       Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index a625f36..0eabf2c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -105,6 +105,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
 
     // Since we are going to change status to isActive=true, schedule all flows
     if (isActive) {
+      // Need to set active first; otherwise in the STANDBY->ACTIVE transition,
+      // the onAddSpec will forward specs to the leader, which is itself.
+      this.isActive = isActive;
       if (this.flowCatalog.isPresent()) {
         Collection<Spec> specs = this.flowCatalog.get().getSpecs();
         for (Spec spec : specs) {
@@ -117,11 +120,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
       for (Spec spec : this.scheduledFlowSpecs.values()) {
         onDeleteSpec(spec.getUri(), spec.getVersion());
       }
+      // Need to set active at the end; otherwise in the ACTIVE->STANDBY transition,
+      // the onDeleteSpec will forward specs to the leader, which is itself.
+      this.isActive = isActive;
     }
-
-    // Change status after invoking addition / removal of specs, or else they will use isActive
-    // .. to exhibit behavior for updated iActive value
-    this.isActive = isActive;
   }
 
   @Override
@@ -129,8 +131,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     super.startUp();
   }
 
+  /**
+   * Synchronize the job scheduling because the same flowSpec can be scheduled by different threads.
+   */
   @Override
-  public void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
+  public synchronized void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
     Map<String, Object> additionalJobDataMap = Maps.newHashMap();
     additionalJobDataMap.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWSPEC,
         this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)));
@@ -162,7 +167,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
       return;
     }
 
-    _log.info("New Spec detected: " + addedSpec);
+    _log.info("New Flow Spec detected: " + addedSpec);
 
     if (addedSpec instanceof FlowSpec) {
       if (!isActive && helixManager.isPresent()) {