You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/05 20:35:17 UTC
incubator-gobblin git commit: [GOBBLIN-276] Change setActive order to
prevent flow spec loss
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 312e768f5 -> bd17f1384
[GOBBLIN-276] Change setActive order to prevent flow spec loss
Closes #2129 from yukuai518/topologyOrder
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/bd17f138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/bd17f138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/bd17f138
Branch: refs/heads/master
Commit: bd17f1384736a70a26a0808ca406608436373812
Parents: 312e768
Author: Kuai Yu <ku...@linkedin.com>
Authored: Thu Oct 5 13:35:11 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Oct 5 13:35:11 2017 -0700
----------------------------------------------------------------------
.../runtime/spec_catalog/TopologyCatalog.java | 5 +++++
.../modules/core/GobblinServiceManager.java | 15 ++++++++++++++-
.../modules/flow/BaseFlowToJobSpecCompiler.java | 6 ++++++
.../modules/orchestration/Orchestrator.java | 5 +++--
.../scheduler/GobblinServiceJobScheduler.java | 17 +++++++++++------
5 files changed, 39 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 2122014..c6e02d2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -23,7 +23,10 @@ import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
import javax.annotation.Nonnull;
+import lombok.Getter;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
@@ -64,6 +67,8 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
protected final MetricContext metricContext;
protected final TopologyCatalog.StandardMetrics metrics;
protected final SpecStore specStore;
+ @Getter
+ protected CountDownLatch initComplete = new CountDownLatch(1);
private final ClassAliasResolver<SpecStore> aliasResolver;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 598371d..a13ed28 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -223,7 +223,6 @@ public class GobblinServiceManager implements ApplicationLauncher {
// Register Scheduler to listen to changes in Flows
if (isSchedulerEnabled) {
this.flowCatalog.addListener(this.scheduler);
- this.topologyCatalog.addListener(this.orchestrator);
}
// Initialize TopologySpecFactory
@@ -356,12 +355,22 @@ public class GobblinServiceManager implements ApplicationLauncher {
}
// Populate TopologyCatalog with all Topologies generated by TopologySpecFactory
+ // This has to be done after the topologyCatalog service is launched
if (this.isTopologySpecFactoryEnabled) {
Collection<TopologySpec> topologySpecs = this.topologySpecFactory.getTopologies();
for (TopologySpec topologySpec : topologySpecs) {
this.topologyCatalog.put(topologySpec);
}
}
+
+ // Register Orchestrator to listen to changes in topology
+ // This has to be done after topologySpecFactory has updated spec store, so that listeners will have the latest updates.
+ if (isSchedulerEnabled) {
+ this.topologyCatalog.addListener(this.orchestrator);
+ }
+
+ // Notify now topologyCatalog has the right information
+ this.topologyCatalog.getInitComplete().countDown();
}
@Override
@@ -479,6 +488,7 @@ public class GobblinServiceManager implements ApplicationLauncher {
public HelixTaskResult handleMessage() throws InterruptedException {
if (jobScheduler.isActive()) {
String flowSpecUri = _message.getAttribute(Message.Attributes.INNER_MESSAGE);
+ LOGGER.info ("ControllerUserDefinedMessage received : {}, type {}", flowSpecUri, _message.getMsgSubType());
try {
if (_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_ADD)) {
Spec spec = flowCatalog.getSpec(new URI(flowSpecUri));
@@ -493,6 +503,9 @@ public class GobblinServiceManager implements ApplicationLauncher {
} catch (SpecNotFoundException | URISyntaxException e) {
LOGGER.error("Cannot process Helix message for flowSpecUri: " + flowSpecUri, e);
}
+ } else {
+ String flowSpecUri = _message.getAttribute(Message.Attributes.INNER_MESSAGE);
+ LOGGER.info ("ControllerUserDefinedMessage received but ignored due to not in active mode: {}, type {}", flowSpecUri, _message.getMsgSubType());
}
HelixTaskResult helixTaskResult = new HelixTaskResult();
helixTaskResult.setSuccess(true);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 26f4463..db92ef9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -149,6 +149,12 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
@Override
public synchronized void onAddSpec(Spec addedSpec) {
+ TopologySpec spec = (TopologySpec) addedSpec;
+ log.info ("Loading topology {}", spec.toLongString());
+ for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
+ log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+ }
+
topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 261ce6e..286911b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -138,8 +138,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
/** {@inheritDoc} */
@Override
public void onAddSpec(Spec addedSpec) {
- _log.info("New Spec detected: " + addedSpec);
-
if (addedSpec instanceof TopologySpec) {
_log.info("New Spec detected of type TopologySpec: " + addedSpec);
this.specCompiler.onAddSpec(addedSpec);
@@ -178,6 +176,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
}
public void orchestrate(Spec spec) throws Exception {
+ // Add below waiting because TopologyCatalog and FlowCatalog service can be launched at the same time
+ this.topologyCatalog.get().getInitComplete().await();
+
long startTime = System.nanoTime();
if (spec instanceof FlowSpec) {
Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index a625f36..0eabf2c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -105,6 +105,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
// Since we are going to change status to isActive=true, schedule all flows
if (isActive) {
+ // Need to set active first; otherwise in the STANDBY->ACTIVE transition,
+ // the onAddSpec will forward specs to the leader, which is itself.
+ this.isActive = isActive;
if (this.flowCatalog.isPresent()) {
Collection<Spec> specs = this.flowCatalog.get().getSpecs();
for (Spec spec : specs) {
@@ -117,11 +120,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
for (Spec spec : this.scheduledFlowSpecs.values()) {
onDeleteSpec(spec.getUri(), spec.getVersion());
}
+ // Need to set active at the end; otherwise in the ACTIVE->STANDBY transition,
+ // the onDeleteSpec will forward specs to the leader, which is itself.
+ this.isActive = isActive;
}
-
- // Change status after invoking addition / removal of specs, or else they will use isActive
- // .. to exhibit behavior for updated iActive value
- this.isActive = isActive;
}
@Override
@@ -129,8 +131,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
super.startUp();
}
+ /**
+ * Synchronize the job scheduling because the same flowSpec can be scheduled by different threads.
+ */
@Override
- public void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
+ public synchronized void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
Map<String, Object> additionalJobDataMap = Maps.newHashMap();
additionalJobDataMap.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWSPEC,
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)));
@@ -162,7 +167,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
return;
}
- _log.info("New Spec detected: " + addedSpec);
+ _log.info("New Flow Spec detected: " + addedSpec);
if (addedSpec instanceof FlowSpec) {
if (!isActive && helixManager.isPresent()) {