You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/09/22 19:58:16 UTC
incubator-gobblin git commit: [GOBBLIN-262] Make multihop compiler
use the first template specified by user
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 6e4a2cecb -> 99fcd7f3c
[GOBBLIN-262] Make multihop compiler use the first template specified by user
Closes #2114 from yukuai518/gaas
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/99fcd7f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/99fcd7f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/99fcd7f3
Branch: refs/heads/master
Commit: 99fcd7f3c4ad7ca698f3aefe1b51a7a6d1b4b8ec
Parents: 6e4a2ce
Author: Kuai Yu <ku...@linkedin.com>
Authored: Fri Sep 22 12:58:10 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Fri Sep 22 12:58:10 2017 -0700
----------------------------------------------------------------------
.../flow/MultiHopsFlowToJobSpecCompiler.java | 45 ++++++++++++++++----
1 file changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99fcd7f3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
index 456f3a3..96d41e5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
@@ -30,11 +30,14 @@ import java.util.stream.Collectors;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.modules.policy.ServicePolicy;
import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
import org.jgrapht.graph.DirectedWeightedMultigraph;
import org.slf4j.Logger;
import org.apache.gobblin.runtime.api.FlowEdge;
@@ -188,7 +191,7 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
FlowEdge tmpFlowEdge = resultEdgePath.get(i);
ServiceNode edgeSrcNode = ((LoadBasedFlowEdgeImpl) tmpFlowEdge).getSourceNode();
ServiceNode edgeTgtNode = ((LoadBasedFlowEdgeImpl) tmpFlowEdge).getTargetNode();
- specExecutorInstanceMap.put(jobSpecGenerator(edgeSrcNode, edgeTgtNode, flowSpec),
+ specExecutorInstanceMap.put(convertHopToJobSpec(edgeSrcNode, edgeTgtNode, flowSpec),
((LoadBasedFlowEdgeImpl) (resultEdgePath.get(i))).getSpecExecutorInstance());
}
}
@@ -217,7 +220,7 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
ServiceNode targetNode = new BaseServiceNodeImpl(userSpecfiedPath.get(i + 1));
if (weightedGraph.containsVertex(sourceNode) && weightedGraph.containsVertex(targetNode)
&& weightedGraph.containsEdge(sourceNode, targetNode)) {
- tmpSpecExecutorInstanceMap.put(jobSpecGenerator(sourceNode, targetNode, flowSpec),
+ tmpSpecExecutorInstanceMap.put(convertHopToJobSpec(sourceNode, targetNode, flowSpec),
(((LoadBasedFlowEdgeImpl) weightedGraph.getEdge(sourceNode, targetNode)).getSpecExecutorInstance()));
} else {
log.error("User Specified Path is invalid");
@@ -261,15 +264,13 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
/**
* Generate JobSpec based on the #templateURI that user specified.
*/
- private JobSpec jobSpecGenerator(ServiceNode sourceNode, ServiceNode targetNode, FlowEdge flowEdge, URI templateURI,
- FlowSpec flowSpec) {
+ private JobSpec buildJobSpec (ServiceNode sourceNode, ServiceNode targetNode, URI templateURI, FlowSpec flowSpec) {
JobSpec jobSpec;
JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowSpec, sourceNode, targetNode))
.withConfig(flowSpec.getConfig())
.withDescription(flowSpec.getDescription())
.withVersion(flowSpec.getVersion());
- if (edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity()) && edgeTemplateMap.get(flowEdge.getEdgeIdentity())
- .contains(templateURI)) {
+ if (templateURI != null) {
jobSpecBuilder.withTemplate(templateURI);
try {
jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get());
@@ -281,6 +282,27 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
jobSpec = jobSpecBuilder.build();
log.info("Unresolved JobSpec properties are: " + jobSpec.getConfigAsProperties());
}
+
+ // Remove schedule
+ jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
+
+ // Add job.name and job.group
+ if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_NAME_KEY)) {
+ jobSpec.setConfig(jobSpec.getConfig()
+ .withValue(ConfigurationKeys.JOB_NAME_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_NAME_KEY)));
+ }
+ if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_GROUP_KEY)) {
+ jobSpec.setConfig(jobSpec.getConfig()
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY)));
+ }
+
+ // Add flow execution id for this compilation
+ long flowExecutionId = System.currentTimeMillis();
+ jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+ ConfigValueFactory.fromAnyRef(flowExecutionId)));
+
+ // Reset properties in Spec from Config
+ jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));
return jobSpec;
}
@@ -289,12 +311,17 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
* Handle the case when edge is not specified.
* Always select the first available template.
*/
- private JobSpec jobSpecGenerator(ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec) {
+ private JobSpec convertHopToJobSpec (ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec) {
FlowEdge flowEdge = weightedGraph.getAllEdges(sourceNode, targetNode).iterator().next();
+ URI templateURI = getTemplateURI (sourceNode, targetNode, flowSpec, flowEdge);
+ return buildJobSpec(sourceNode, targetNode, templateURI, flowSpec);
+ }
+
+ private URI getTemplateURI (ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec, FlowEdge flowEdge) {
URI firstTemplateURI =
(edgeTemplateMap != null && edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? edgeTemplateMap.get(
- flowEdge.getEdgeIdentity()).get(0) : jobSpecGenerator(flowSpec).getUri();
- return this.jobSpecGenerator(sourceNode, targetNode, flowEdge, firstTemplateURI, flowSpec);
+ flowEdge.getEdgeIdentity()).get(0) : jobSpecGenerator(flowSpec).getTemplateURI().orNull();
+ return firstTemplateURI;
}
/**