You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/09/22 19:58:16 UTC

incubator-gobblin git commit: [GOBBLIN-262] Make multihop compiler use the first template specified by user

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 6e4a2cecb -> 99fcd7f3c


[GOBBLIN-262] Make multihop compiler use the first template specified by user

Closes #2114 from yukuai518/gaas


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/99fcd7f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/99fcd7f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/99fcd7f3

Branch: refs/heads/master
Commit: 99fcd7f3c4ad7ca698f3aefe1b51a7a6d1b4b8ec
Parents: 6e4a2ce
Author: Kuai Yu <ku...@linkedin.com>
Authored: Fri Sep 22 12:58:10 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Fri Sep 22 12:58:10 2017 -0700

----------------------------------------------------------------------
 .../flow/MultiHopsFlowToJobSpecCompiler.java    | 45 ++++++++++++++++----
 1 file changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99fcd7f3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
index 456f3a3..96d41e5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
@@ -30,11 +30,14 @@ import java.util.stream.Collectors;
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.service.modules.policy.ServicePolicy;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
 import org.jgrapht.graph.DirectedWeightedMultigraph;
 import org.slf4j.Logger;
 import org.apache.gobblin.runtime.api.FlowEdge;
@@ -188,7 +191,7 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
       FlowEdge tmpFlowEdge = resultEdgePath.get(i);
       ServiceNode edgeSrcNode = ((LoadBasedFlowEdgeImpl) tmpFlowEdge).getSourceNode();
       ServiceNode edgeTgtNode = ((LoadBasedFlowEdgeImpl) tmpFlowEdge).getTargetNode();
-      specExecutorInstanceMap.put(jobSpecGenerator(edgeSrcNode, edgeTgtNode, flowSpec),
+      specExecutorInstanceMap.put(convertHopToJobSpec(edgeSrcNode, edgeTgtNode, flowSpec),
           ((LoadBasedFlowEdgeImpl) (resultEdgePath.get(i))).getSpecExecutorInstance());
     }
   }
@@ -217,7 +220,7 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
       ServiceNode targetNode = new BaseServiceNodeImpl(userSpecfiedPath.get(i + 1));
       if (weightedGraph.containsVertex(sourceNode) && weightedGraph.containsVertex(targetNode)
           && weightedGraph.containsEdge(sourceNode, targetNode)) {
-        tmpSpecExecutorInstanceMap.put(jobSpecGenerator(sourceNode, targetNode, flowSpec),
+        tmpSpecExecutorInstanceMap.put(convertHopToJobSpec(sourceNode, targetNode, flowSpec),
             (((LoadBasedFlowEdgeImpl) weightedGraph.getEdge(sourceNode, targetNode)).getSpecExecutorInstance()));
       } else {
         log.error("User Specified Path is invalid");
@@ -261,15 +264,13 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
   /**
    * Generate JobSpec based on the #templateURI that user specified.
    */
-  private JobSpec jobSpecGenerator(ServiceNode sourceNode, ServiceNode targetNode, FlowEdge flowEdge, URI templateURI,
-      FlowSpec flowSpec) {
+  private JobSpec buildJobSpec (ServiceNode sourceNode, ServiceNode targetNode, URI templateURI, FlowSpec flowSpec) {
     JobSpec jobSpec;
     JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowSpec, sourceNode, targetNode))
         .withConfig(flowSpec.getConfig())
         .withDescription(flowSpec.getDescription())
         .withVersion(flowSpec.getVersion());
-    if (edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity()) && edgeTemplateMap.get(flowEdge.getEdgeIdentity())
-        .contains(templateURI)) {
+    if (templateURI != null) {
       jobSpecBuilder.withTemplate(templateURI);
       try {
         jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get());
@@ -281,6 +282,27 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
       jobSpec = jobSpecBuilder.build();
       log.info("Unresolved JobSpec properties are: " + jobSpec.getConfigAsProperties());
     }
+
+    // Remove schedule
+    jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
+
+    // Add job.name and job.group
+    if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_NAME_KEY)) {
+      jobSpec.setConfig(jobSpec.getConfig()
+          .withValue(ConfigurationKeys.JOB_NAME_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_NAME_KEY)));
+    }
+    if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_GROUP_KEY)) {
+      jobSpec.setConfig(jobSpec.getConfig()
+          .withValue(ConfigurationKeys.JOB_GROUP_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY)));
+    }
+
+    // Add flow execution id for this compilation
+    long flowExecutionId = System.currentTimeMillis();
+    jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+        ConfigValueFactory.fromAnyRef(flowExecutionId)));
+
+    // Reset properties in Spec from Config
+    jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));
     return jobSpec;
   }
 
@@ -289,12 +311,17 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
    * Handle the case when edge is not specified.
    * Always select the first available template.
    */
-  private JobSpec jobSpecGenerator(ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec) {
+  private JobSpec convertHopToJobSpec (ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec) {
     FlowEdge flowEdge = weightedGraph.getAllEdges(sourceNode, targetNode).iterator().next();
+    URI templateURI = getTemplateURI (sourceNode, targetNode, flowSpec, flowEdge);
+    return buildJobSpec(sourceNode, targetNode, templateURI, flowSpec);
+  }
+
+  private URI getTemplateURI (ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec, FlowEdge flowEdge) {
     URI firstTemplateURI =
         (edgeTemplateMap != null && edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? edgeTemplateMap.get(
-            flowEdge.getEdgeIdentity()).get(0) : jobSpecGenerator(flowSpec).getUri();
-    return this.jobSpecGenerator(sourceNode, targetNode, flowEdge, firstTemplateURI, flowSpec);
+            flowEdge.getEdgeIdentity()).get(0) : jobSpecGenerator(flowSpec).getTemplateURI().orNull();
+    return firstTemplateURI;
   }
 
   /**