You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/07 17:01:10 UTC

incubator-gobblin git commit: [GOBBLIN-646] Refactor MultiHopFlowCompiler to use SpecExecutor configs from TopologySpecMap.[]

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master b2dd7ddfc -> ccaa02c6e


[GOBBLIN-646] Refactor MultiHopFlowCompiler to use SpecExecutor configs from TopologySpecMap.[]

Closes #2516 from sv2000/specExecutorsRefactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ccaa02c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ccaa02c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ccaa02c6

Branch: refs/heads/master
Commit: ccaa02c6e132a5731706a61620f04abd89be75fe
Parents: b2dd7dd
Author: suvasude <su...@linkedin.biz>
Authored: Fri Dec 7 09:01:05 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Dec 7 09:01:05 2018 -0800

----------------------------------------------------------------------
 .../modules/core/GitFlowGraphMonitor.java       | 39 ++++++++++++--
 .../modules/flow/MultiHopFlowCompiler.java      |  2 +-
 .../service/modules/flowgraph/BaseFlowEdge.java | 27 ++++------
 .../modules/flowgraph/FlowEdgeFactory.java      |  7 ++-
 .../pathfinder/AbstractPathFinder.java          |  9 ++--
 .../modules/core/GitFlowGraphMonitorTest.java   | 19 ++++---
 .../modules/flow/MultiHopFlowCompilerTest.java  | 54 ++++++++++++++++++--
 .../flowgraph/BaseFlowEdgeFactoryTest.java      | 21 +++++---
 .../adls-1-to-adls-1-retention-1.properties     |  6 +--
 .../adls-1-to-adls-1-retention-2.properties     |  6 +--
 .../hdfs-1-to-hdfs-1-encrypt.properties         |  6 +--
 .../hdfs-1-to-hdfs-1-retention.properties       |  6 +--
 .../flowedges/hdfs-1-to-hdfs-3.properties       |  6 +--
 .../hdfs-2-hdfs-2-retention.properties          |  6 +--
 .../hdfs-2-to-hdfs-2-encrypt.properties         |  6 +--
 .../flowedges/hdfs-2-to-hdfs-4.properties       |  6 +--
 .../flowedges/hdfs-3-to-adls-1.properties       |  7 +--
 .../hdfs-3-to-hdfs-3-retention.properties       |  6 +--
 .../flowedges/hdfs-4-to-adls-1.properties       |  6 +--
 .../hdfs-4-to-hdfs-4-retention.properties       |  6 +--
 .../flowedges/local-to-hdfs-1.properties        |  6 +--
 .../flowedges/local-to-hdfs-2.properties        |  6 +--
 .../local-to-local-retention.properties         |  6 +--
 .../jobs/hdfs-encrypt-avro-to-json.job          |  3 ++
 .../hdfsToAdl/jobs/distcp-hdfs-to-adl.job       |  5 +-
 .../hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job     |  3 ++
 .../localToHdfs/jobs/distcp-local-to-hdfs.job   |  3 ++
 .../distcp-push-hdfs-to-adl.template            | 11 +---
 .../multihop/jobTemplates/distcp.template       | 11 +---
 .../topologyspec_catalog/azkaban01.properties   |  2 +
 .../topologyspec_catalog/azkaban02.properties   |  2 +
 .../topologyspec_catalog/azkaban03.properties   |  2 +
 .../topologyspec_catalog/azkaban04.properties   |  2 +
 .../topologyspec_catalog/local01.properties     |  2 +
 .../testExecutor1.properties                    |  3 ++
 .../testExecutor2.properties                    |  3 ++
 36 files changed, 174 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index ed16c39..54544a1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -18,8 +18,12 @@
 package org.apache.gobblin.service.modules.core;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
@@ -36,6 +40,8 @@ import com.typesafe.config.ConfigValueFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
@@ -82,12 +88,14 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
 
   private FSFlowCatalog flowCatalog;
   private FlowGraph flowGraph;
+  private final Map<URI, TopologySpec> topologySpecMap;
   private final Config emptyConfig = ConfigFactory.empty();
 
-  public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph) {
+  public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap) {
     super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
     this.flowCatalog = flowCatalog;
     this.flowGraph = graph;
+    this.topologySpecMap = topologySpecMap;
   }
 
   /**
@@ -203,11 +211,12 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
     if (checkFilePath(change.getNewPath(), EDGE_FILE_DEPTH)) {
       Path edgeFilePath = new Path(this.repositoryDir, change.getNewPath());
       try {
-        Config config = loadEdgeFileWithOverrides(edgeFilePath);
-        Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+        Config edgeConfig = loadEdgeFileWithOverrides(edgeFilePath);
+        List<SpecExecutor> specExecutors = getSpecExecutors(edgeConfig);
+        Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
             FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
-        FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
-        FlowEdge edge = flowEdgeFactory.createFlowEdge(config, flowCatalog);
+        FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, edgeConfig);
+        FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowCatalog, specExecutors);
         if (!this.flowGraph.addFlowEdge(edge)) {
           log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId());
         } else {
@@ -304,12 +313,32 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
     String source = edgeFilePath.getParent().getParent().getName();
     String destination = edgeFilePath.getParent().getName();
     String edgeName = Files.getNameWithoutExtension(edgeFilePath.getName());
+
     return edgeConfig.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ConfigValueFactory.fromAnyRef(source))
         .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ConfigValueFactory.fromAnyRef(destination))
         .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(getEdgeId(source, destination, edgeName)));
   }
 
   /**
+   * This method first retrieves  the logical names of all the {@link org.apache.gobblin.runtime.api.SpecExecutor}s
+   * for this edge and returns the SpecExecutors from the {@link TopologySpec} map.
+   * @param edgeConfig containing the logical names of SpecExecutors for this edge.
+   * @return a {@link List<SpecExecutor>}s for this edge.
+   */
+  private List<SpecExecutor> getSpecExecutors(Config edgeConfig)
+      throws URISyntaxException {
+    //Get the logical names of SpecExecutors where the FlowEdge can be executed.
+    List<String> specExecutorNames = ConfigUtils.getStringList(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
+    //Load all the SpecExecutor configurations for this FlowEdge from the SpecExecutor Catalog.
+    List<SpecExecutor> specExecutors = new ArrayList<>();
+    for (String specExecutorName: specExecutorNames) {
+      URI specExecutorUri = new URI(specExecutorName);
+      specExecutors.add(this.topologySpecMap.get(specExecutorUri).getSpecExecutor());
+    }
+    return specExecutors;
+  }
+
+  /**
    * Load the node file.
    * @param filePath path of the node file relative to the repository root
    * @return the configuration object

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 50da32a..c1b3e84 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -94,7 +94,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
       throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
     }
     this.flowGraph = new BaseFlowGraph();
-    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph);
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph, this.topologySpecMap);
     this.serviceManager = new ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor));
     addShutdownHook();
     //Start the git flow graph monitor

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index 56f6c1b..250fd57 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -17,23 +17,32 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import joptsimple.internal.Strings;
 import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.util.PullFileLoader;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -128,7 +137,7 @@ public class BaseFlowEdge implements FlowEdge {
      * @return a {@link BaseFlowEdge}
      */
     @Override
-    public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog) throws FlowEdgeCreationException {
+    public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException {
       try {
         String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, "");
         Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
@@ -138,30 +147,16 @@ public class BaseFlowEdge implements FlowEdge {
         String edgeId = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
         Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeId), "A FlowEdge must have a non-null or empty Id");
 
-        List<Config> specExecutorConfigList = new ArrayList<>();
-        boolean flag;
-        for (int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)); i++) {
-          specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i));
-        }
-
         String flowTemplateDirUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "");
 
         //Perform basic validation
         Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points");
         Preconditions
-            .checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor");
+            .checkArgument(specExecutors.size() > 0, "A FlowEdge must have at least one SpecExecutor");
         Preconditions
             .checkArgument(!Strings.isNullOrEmpty(flowTemplateDirUri), "FlowTemplate URI must be not null or empty");
         boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
 
-        //Build SpecExecutor from config
-        List<SpecExecutor> specExecutors = new ArrayList<>();
-
-        for (Config specExecutorConfig : specExecutorConfigList) {
-          Class executorClass = Class.forName(specExecutorConfig.getString(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY));
-          SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig);
-          specExecutors.add(executor);
-        }
         FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateDirUri));
         return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive);
       } catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
index 2977231..3744bf0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
@@ -17,20 +17,23 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
+import java.util.List;
+
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 
 public interface FlowEdgeFactory {
   /**
    * Construct a {@link FlowEdge} from the edge properties
    * @param edgeProps properties of the {@link FlowEdge}
-   * @param catalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s
+   * @param flowCatalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s
    *               useful for creating a {@link FlowEdge}.
    * @return an instance of {@link FlowEdge}
    * @throws FlowEdgeCreationException
    */
-  public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog catalog) throws FlowEdgeCreationException;
+  public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException;
 
   public class FlowEdgeCreationException extends Exception {
     private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index f58a6d8..918f4a6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -181,7 +181,7 @@ public abstract class AbstractPathFinder implements PathFinder {
         boolean foundExecutor = false;
         //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
         for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
-          Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
+          Config mergedConfig = getMergedConfig(flowEdge);
           List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
               flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
           for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
@@ -285,21 +285,18 @@ public abstract class AbstractPathFinder implements PathFinder {
    * <ul>
    *   <p> the user provided flow config </p>
    *   <p> edge specific properties/overrides </p>
-   *   <p> spec executor config/overrides </p>
    *   <p> source node config </p>
    *   <p> destination node config </p>
    * </ul>
    * Each {@link JobTemplate}'s config will eventually be resolved against this merged config.
    * @param flowEdge An instance of {@link FlowEdge}.
-   * @param specExecutor A {@link SpecExecutor}.
    * @return the merged config derived as described above.
    */
-  private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor)
+  private Config getMergedConfig(FlowEdge flowEdge)
       throws ExecutionException, InterruptedException {
     Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
     Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
-    Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig())
-        .withFallback(srcNodeConfig).withFallback(destNodeConfig);
+    Config mergedConfig = flowConfig.withFallback(flowEdge.getConfig()).withFallback(srcNodeConfig).withFallback(destNodeConfig);
     return mergedConfig;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index bd7d3cf..ab8af70 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -50,12 +51,14 @@ import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 
 
 public class GitFlowGraphMonitorTest {
@@ -96,6 +99,9 @@ public class GitFlowGraphMonitorTest {
     this.gitForPush.commit().setMessage("First commit").call();
     this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
 
+    URI topologyCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
+    Map<URI, TopologySpec> topologySpecMap = MultiHopFlowCompilerTest.buildTopologySpecMap(topologyCatalogUri);
+
     this.config = ConfigBuilder.create()
         .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
             + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
@@ -116,7 +122,7 @@ public class GitFlowGraphMonitorTest {
     //Create a FlowGraph instance with defaults
     this.flowGraph = new BaseFlowGraph();
 
-    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph);
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap);
     this.gitFlowGraphMonitor.setActive(true);
   }
 
@@ -325,14 +331,7 @@ public class GitFlowGraphMonitorTest {
         + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=" + edgeName + "\n"
         + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
         + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "=testExecutor1,testExecutor2\n"
         + "key1=" + value + "\n";
     return fileContents;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 54792f9..6b51a77 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -25,8 +25,10 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -38,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.eclipse.jgit.api.Git;
 import org.eclipse.jgit.api.errors.GitAPIException;
 import org.eclipse.jgit.lib.Repository;
@@ -68,6 +71,7 @@ import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
@@ -83,6 +87,7 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 import org.apache.gobblin.util.CompletedFuture;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
@@ -116,9 +121,11 @@ public class MultiHopFlowCompilerTest {
       }
     }
 
+    URI specExecutorCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
+    Map<URI, TopologySpec> topologySpecMap = buildTopologySpecMap(specExecutorCatalogUri);
+
     //Create a FSFlowCatalog instance
     URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-    // Create a FSFlowCatalog instance
     Properties properties = new Properties();
     properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
     Config config = ConfigFactory.parseProperties(properties);
@@ -127,7 +134,6 @@ public class MultiHopFlowCompilerTest {
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
     FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
 
-
     //Add FlowEdges from the edge properties files
     URI flowEdgesURI = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
     fs = FileSystem.get(flowEdgesURI, new Configuration());
@@ -139,13 +145,55 @@ public class MultiHopFlowCompilerTest {
         Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
             FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
         FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
-        FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog);
+        List<String> specExecutorNames = ConfigUtils.getStringList(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
+        List<SpecExecutor> specExecutors = new ArrayList<>();
+        for (String specExecutorName: specExecutorNames) {
+          specExecutors.add(topologySpecMap.get(new URI(specExecutorName)).getSpecExecutor());
+        }
+        FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog, specExecutors);
         this.flowGraph.addFlowEdge(edge);
       }
     }
     this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph);
   }
 
+  /**
+   * A helper method to return a {@link TopologySpec} map, given a {@link org.apache.gobblin.runtime.spec_catalog.TopologyCatalog}.
+   * @param topologyCatalogUri pointing to the location of the {@link org.apache.gobblin.runtime.spec_catalog.TopologyCatalog}
+   * @return a {@link TopologySpec} map.
+   */
+  public static Map<URI, TopologySpec> buildTopologySpecMap(URI topologyCatalogUri)
+      throws IOException, URISyntaxException, ReflectiveOperationException {
+    FileSystem fs = FileSystem.get(topologyCatalogUri, new Configuration());
+    PathFilter extensionFilter = file -> {
+      for (String extension : Lists.newArrayList(".properties")) {
+        if (file.getName().endsWith(extension)) {
+          return true;
+        }
+      }
+      return false;
+    };
+
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    for (FileStatus fileStatus : fs.listStatus(new Path(topologyCatalogUri.getPath()), extensionFilter)) {
+      URI topologySpecUri = new URI(Files.getNameWithoutExtension(fileStatus.getPath().getName()));
+      Config topologyConfig = ConfigFactory.parseFile(new File(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
+      Class specExecutorClass = Class.forName(topologyConfig.getString(ServiceConfigKeys.SPEC_EXECUTOR_KEY));
+      SpecExecutor specExecutor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(specExecutorClass, topologyConfig);
+
+      TopologySpec.Builder topologySpecBuilder = TopologySpec
+          .builder(topologySpecUri)
+          .withConfig(topologyConfig)
+          .withDescription("")
+          .withVersion("1")
+          .withSpecExecutor(specExecutor);
+
+      TopologySpec topologySpec = topologySpecBuilder.build();
+      topologySpecMap.put(topologySpecUri, topologySpec);
+    }
+    return topologySpecMap;
+  }
+
   private FlowSpec createFlowSpec(String flowConfigResource, String source, String destination, boolean applyRetention, boolean applyRetentionOnInput)
       throws IOException, URISyntaxException {
     //Create a flow spec

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
index 2694f5c..085d0a7 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
@@ -18,14 +18,19 @@
 package org.apache.gobblin.service.modules.flowgraph;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.util.ConfigUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.ServiceConfigKeys;
@@ -43,12 +48,14 @@ public class BaseFlowEdgeFactoryTest {
     properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1");
     properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "node1:node2:edge1");
     properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "FS:///flowEdgeTemplate");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2");
+
+    List<SpecExecutor> specExecutorList = new ArrayList<>();
+    Config config1 = ConfigFactory.empty().withValue("specStore.fs.dir", ConfigValueFactory.fromAnyRef("/tmp1")).
+        withValue("specExecInstance.capabilities", ConfigValueFactory.fromAnyRef("s1:d1"));
+    specExecutorList.add(new InMemorySpecExecutor(config1));
+    Config config2 = ConfigFactory.empty().withValue("specStore.fs.dir", ConfigValueFactory.fromAnyRef("/tmp2")).
+        withValue("specExecInstance.capabilities", ConfigValueFactory.fromAnyRef("s2:d2"));
+    specExecutorList.add(new InMemorySpecExecutor(config2));
 
     FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory();
 
@@ -61,7 +68,7 @@ public class BaseFlowEdgeFactoryTest {
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
     FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
     Config edgeProps = ConfigUtils.propertiesToConfig(properties);
-    FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog);
+    FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog, specExecutorList);
     Assert.assertEquals(flowEdge.getSrc(), "node1");
     Assert.assertEquals(flowEdge.getDest(), "node2");
     Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
index 52079d1..b01722c 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
@@ -2,8 +2,4 @@ flow.edge.source=ADLS-1
 flow.edge.destination=ADLS-1
 flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban03
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
index 7b1a160..def8cb9 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
@@ -2,8 +2,4 @@ flow.edge.source=ADLS-1
 flow.edge.destination=ADLS-1
 flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban04
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
index 44d3c44..110c665 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-1
 flow.edge.destination=HDFS-1
 flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+flow.edge.specExecutors=azkaban01

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
index 926c51e..8e5b5ef 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-1
 flow.edge.destination=HDFS-1
 flow.edge.id=HDFS-1:HDFS-1:hdfsRetention
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban01
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
index 897f003..5abd015 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
@@ -2,9 +2,5 @@ flow.edge.source=HDFS-1
 flow.edge.destination=HDFS-3
 flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+flow.edge.specExecutors=azkaban01
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
index 26454e7..8b54a22 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-2
 flow.edge.destination=HDFS-2
 flow.edge.id=HDFS-2:HDFS-2:hdfsRetention
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban02
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
index db0ea48..24339ec 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-2
 flow.edge.destination=HDFS-2
 flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+flow.edge.specExecutors=azkaban02

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
index 44f0408..268a414 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-2
 flow.edge.destination=HDFS-4
 flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+flow.edge.specExecutors=azkaban02

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
index 68ca0ca..3471277 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
@@ -2,12 +2,7 @@ flow.edge.source=HDFS-3
 flow.edge.destination=ADLS-1
 flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
-
+flow.edge.specExecutors=azkaban03
 # Proxy config
 flow.edge.proxy.host=adl-proxy.linkedin.com
 flow.edge.proxy.port=1234
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
index f390546..f1d1adf 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-3
 flow.edge.destination=HDFS-3
 flow.edge.id=HDFS-3:HDFS-3:hdfsRetention
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban03
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
index 04f0885..9a08893 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
@@ -2,11 +2,7 @@ flow.edge.source=HDFS-4
 flow.edge.destination=ADLS-1
 flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+flow.edge.specExecutors=azkaban04
 
 # Proxy config
 flow.edge.proxy.host=adl-proxy.linkedin.com

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
index 6afb3d8..0a005c8 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-4
 flow.edge.destination=HDFS-4
 flow.edge.id=HDFS-4:HDFS-4:hdfsRetention
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban04
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
index 268b67f..8a3809e 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
@@ -2,8 +2,4 @@ flow.edge.source=LocalFS-1
 flow.edge.destination=HDFS-1
 flow.edge.id=LocalFS-1:HDFS-1:localToHdfs
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=fs:///
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
-flow.edge.specExecutors.0.specExecInstance.job.type=java
+flow.edge.specExecutors=local01

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
index bc67810..626a12f 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
@@ -2,8 +2,4 @@ flow.edge.source=LocalFS-1
 flow.edge.destination=HDFS-2
 flow.edge.id=LocalFS-1:HDFS-2:localToHdfs
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=fs:///
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
-flow.edge.specExecutors.0.specExecInstance.job.type=java
\ No newline at end of file
+flow.edge.specExecutors=local01
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
index 76c77fd..c1f17eb 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
@@ -2,8 +2,4 @@ flow.edge.source=LocalFS-1
 flow.edge.destination=LocalFS-1
 flow.edge.id=LocalFS-1:LocalFS-1:localRetention
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=fs:///
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
-flow.edge.specExecutors.0.specExecInstance.job.type=java
\ No newline at end of file
+flow.edge.specExecutors=local01
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
index e0c8fa4..05fa8cd 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
@@ -1 +1,4 @@
 gobblin.template.uri="FS:///multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template"
+job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+launcher.type=MAPREDUCE
+type=hadoopJava
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
index 429d4c0..779044b 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
@@ -1 +1,4 @@
-gobblin.template.uri="FS:///multihop/jobTemplates/distcp-push-hdfs-to-adl.template"
\ No newline at end of file
+gobblin.template.uri="FS:///multihop/jobTemplates/distcp-push-hdfs-to-adl.template"
+job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+launcher.type=MAPREDUCE
+type=hadoopJava
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
index 2d1672c..13c067a 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
@@ -1 +1,4 @@
 gobblin.template.uri="FS:///multihop/jobTemplates/distcp.template"
+job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+launcher.type=MAPREDUCE
+type=hadoopJava
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
index 2d1672c..823b0a4 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
@@ -1 +1,4 @@
 gobblin.template.uri="FS:///multihop/jobTemplates/distcp.template"
+job.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+launcher.type=LOCAL
+type=java
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
index 26b2ed1..0b664dd 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
@@ -52,14 +52,5 @@ source.class="org.apache.gobblin.data.management.copy.CopySource"
 writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder"
 converter.classes="org.apache.gobblin.converter.IdentityConverter"
 
-# =======================================
-# Job Parameters to be resolved using SpecExecutor properties
-# =======================================
-type=${specExecInstance.job.type}
-
 job.jars="lib/*"
-job.lock.enabled=false
-job.class=${specExecInstance.job.launcher.class}
-
-# Gobblin Hadoop Parameters
-launcher.type=${specExecInstance.job.launcher.type}
\ No newline at end of file
+job.lock.enabled=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
index d9fcf6d..1a29262 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
@@ -43,14 +43,5 @@ source.class="org.apache.gobblin.data.management.copy.CopySource"
 writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder"
 converter.classes="org.apache.gobblin.converter.IdentityConverter"
 
-# =======================================
-# Job Parameters to be resolved using SpecExecutor properties
-# =======================================
-type=${specExecInstance.job.type}
-
 job.jars="lib/*"
-job.lock.enabled=false
-job.class=${specExecInstance.job.launcher.class}
-
-# Gobblin Hadoop Parameters
-launcher.type=${specExecInstance.job.launcher.type}
\ No newline at end of file
+job.lock.enabled=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties
new file mode 100644
index 0000000..4bbb1d0
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties
@@ -0,0 +1,2 @@
+specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+specExecInstance.uri=https://azkaban01.gobblin.net:8443
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties
new file mode 100644
index 0000000..1a8856b
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties
@@ -0,0 +1,2 @@
+specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+specExecInstance.uri=https://azkaban02.gobblin.net:8443
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties
new file mode 100644
index 0000000..6261c8b
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties
@@ -0,0 +1,2 @@
+specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+specExecInstance.uri=https://azkaban03.gobblin.net:8443
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties
new file mode 100644
index 0000000..12125fd
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties
@@ -0,0 +1,2 @@
+specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+specExecInstance.uri=https://azkaban04.gobblin.net:8443
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties b/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties
new file mode 100644
index 0000000..db369de
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties
@@ -0,0 +1,2 @@
+specExecutorInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+specExecInstance.uri=fs:///
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties
new file mode 100644
index 0000000..7b67f9b
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties
@@ -0,0 +1,3 @@
+specExecutorInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+specStore.fs.dir=/tmp1
+specExecInstance.capabilities=s1:d1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties
new file mode 100644
index 0000000..449a076
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties
@@ -0,0 +1,3 @@
+specExecutorInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+specStore.fs.dir=/tmp2
+specExecInstance.capabilities=s2:d2
\ No newline at end of file