You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/30 16:57:38 UTC

[2/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS).

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
new file mode 100644
index 0000000..5d0500c
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigSyntax;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Slf4j
+public class FlowGraphPathFinderTest {
+  private FlowGraph flowGraph;
+  private FlowGraphPathFinder pathFinder;
+
+  @BeforeClass
+  public void setUp()
+      throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException {
+    //Create a FlowGraph
+    this.flowGraph = new BaseFlowGraph();
+
+    //Add DataNodes to the graph from the node properties files
+    URI dataNodesUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI();
+    FileSystem fs = FileSystem.get(dataNodesUri, new Configuration());
+    Path dataNodesPath = new Path(dataNodesUri);
+    ConfigParseOptions options = ConfigParseOptions.defaults()
+        .setSyntax(ConfigSyntax.PROPERTIES)
+        .setAllowMissing(false);
+
+    for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) {
+      try (InputStream is = fs.open(fileStatus.getPath())) {
+        Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+        Class dataNodeClass = Class.forName(ConfigUtils
+            .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+        DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
+        this.flowGraph.addDataNode(dataNode);
+      }
+    }
+
+    //Create a FSFlowCatalog instance
+    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    // Create a FSFlowCatalog instance
+    Properties properties = new Properties();
+    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+
+    //Add FlowEdges from the edge properties files
+    URI flowEdgesURI = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
+    fs = FileSystem.get(flowEdgesURI, new Configuration());
+    Path flowEdgesPath = new Path(flowEdgesURI);
+    for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) {
+      log.warn(fileStatus.getPath().toString());
+      try (InputStream is = fs.open(fileStatus.getPath())) {
+        Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+        Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+            FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
+        FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
+        FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog);
+        this.flowGraph.addFlowEdge(edge);
+      }
+    }
+
+    //Create a flow spec
+    Properties flowProperties = new Properties();
+    flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
+    flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup");
+    flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName");
+    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "LocalFS-1");
+    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "ADLS-1");
+    Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties);
+
+    //Get the input/output dataset config from a file
+    URI flowConfigUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flow/flow.conf").toURI();
+    Path flowConfigPath = new Path(flowConfigUri);
+    FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration());
+    try (InputStream is = fs1.open(flowConfigPath)) {
+      Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset()));
+      flowConfig = flowConfig.withFallback(datasetConfig).resolve();
+    }
+
+    FlowSpec.Builder flowSpecBuilder = null;
+    flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri())
+        .withConfig(flowConfig)
+        .withDescription("dummy description")
+        .withVersion(FlowSpec.Builder.DEFAULT_VERSION);
+
+    FlowSpec spec = flowSpecBuilder.build();
+    this.pathFinder = new FlowGraphPathFinder(this.flowGraph, spec);
+  }
+
+  @Test
+  public void testFindPath()
+      throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+             SpecNotFoundException, IOException {
+    Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
+    Assert.assertEquals(jobDag.getNodes().size(), 4);
+    Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+    Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+    //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1"
+    Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+    JobExecutionPlan jobSpecWithExecutor = startNode.getValue();
+    JobSpec jobSpec = jobSpecWithExecutor.getJobSpec();
+
+    //Ensure the resolved job config for the first hop has the correct substitutions.
+    Config jobConfig = jobSpec.getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    String from = jobConfig.getString("from");
+    String to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+    String sourceFsUri = jobConfig.getString("fs.uri");
+    Assert.assertEquals(sourceFsUri, "file:///");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+    Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+    String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+    Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+    Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    Assert.assertEquals(jobConfig.getString("type"), "java");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+    //Ensure the spec executor has the correct configurations
+    SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+    //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt"
+    Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+    jobSpecWithExecutor = secondHopNode.getValue();
+    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3"
+    Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+    jobSpecWithExecutor = thirdHopNode.getValue();
+    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Get the 4th hop - "Distcp from HDFS3 to ADLS-1"
+    Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+    jobSpecWithExecutor = fourthHopNode.getValue();
+    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+    Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+    Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Ensure the fourth hop is the last
+    Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+  }
+
+  @Test (dependsOnMethods = "testFindPath")
+  public void testFindPathAfterFirstEdgeDeletion()
+      throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+             SpecNotFoundException, IOException {
+    //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
+    this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
+
+    Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
+    Assert.assertEquals(jobDag.getNodes().size(), 4);
+    Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+    Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+    //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2"
+    Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+    JobExecutionPlan jobExecutionPlan = startNode.getValue();
+    JobSpec jobSpec = jobExecutionPlan.getJobSpec();
+
+    //Ensure the resolved job config for the first hop has the correct substitutions.
+    Config jobConfig = jobSpec.getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    String from = jobConfig.getString("from");
+    String to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+    String sourceFsUri = jobConfig.getString("fs.uri");
+    Assert.assertEquals(sourceFsUri, "file:///");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+    Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+    String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+    Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+    Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    Assert.assertEquals(jobConfig.getString("type"), "java");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+    //Ensure the spec executor has the correct configurations
+    SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+    //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt"
+    Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+    jobExecutionPlan = secondHopNode.getValue();
+    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4"
+    Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+    jobExecutionPlan = thirdHopNode.getValue();
+    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Get the 4th hop - "Distcp from HDFS4 to ADLS-1"
+    Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+    jobExecutionPlan = fourthHopNode.getValue();
+    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+    Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+    Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Ensure the fourth hop is the last
+    Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+  }
+
+  @Test (dependsOnMethods = "testFindPathAfterFirstEdgeDeletion")
+  public void testFindPathAfterSecondEdgeDeletion()
+      throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+             SpecNotFoundException, IOException {
+    //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
+    this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
+
+    //Ensure no path to destination.
+    Assert.assertNull(pathFinder.findPath());
+  }
+
+  @AfterClass
+  public void tearDown() {
+  }
+
+  public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor {
+    // Executor Instance
+    protected final Config config;
+
+    private SpecProducer<Spec> azkabanSpecProducer;
+
+    public TestAzkabanSpecExecutor(Config config) {
+      super(config);
+      this.config = config;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      //Do nothing
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      //Do nothing
+    }
+
+    @Override
+    public Future<String> getDescription() {
+      return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
+    }
+
+    @Override
+    public Future<? extends SpecProducer> getProducer() {
+      return new CompletedFuture<>(this.azkabanSpecProducer, null);
+    }
+
+    @Override
+    public Future<Config> getConfig() {
+      return new CompletedFuture<>(config, null);
+    }
+
+    @Override
+    public Future<String> getHealth() {
+      return new CompletedFuture<>("Healthy", null);
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
new file mode 100644
index 0000000..2694f5c
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.net.URI;
+import java.util.Properties;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class BaseFlowEdgeFactoryTest {
+  @Test
+  public void testCreateFlowEdge() throws Exception {
+    Properties properties = new Properties();
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"node1");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "node2");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "node1:node2:edge1");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "FS:///flowEdgeTemplate");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2");
+
+    FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory();
+
+    Properties props = new Properties();
+    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    props.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(props);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+    Config edgeProps = ConfigUtils.propertiesToConfig(properties);
+    FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog);
+    Assert.assertEquals(flowEdge.getSrc(), "node1");
+    Assert.assertEquals(flowEdge.getDest(), "node2");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),"s1:d1");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),"/tmp2");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),"s2:d2");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(),"InMemorySpecExecutor");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(),"InMemorySpecExecutor");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
index 04f2270..be7b597 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -33,8 +32,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
 import org.apache.gobblin.service.modules.template.StaticFlowTemplate;
 import org.apache.gobblin.util.ConfigUtils;
@@ -57,9 +54,7 @@ public class BaseFlowGraphTest {
 
   BaseFlowGraph graph;
   @BeforeClass
-  public void setUp()
-      throws URISyntaxException, ReflectiveOperationException, JobTemplate.TemplateException, SpecNotFoundException,
-             IOException, DataNode.DataNodeCreationException {
+  public void setUp() throws URISyntaxException, DataNode.DataNodeCreationException {
     Properties properties = new Properties();
     properties.put("key1", "val1");
     Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
@@ -81,9 +76,9 @@ public class BaseFlowGraphTest {
     //Create a clone of node3
     node3c = new BaseDataNode(node3Config);
 
-    FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null, null);
-    FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null, null);
-    FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null, null);
+    FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null);
+    FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null);
+    FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null);
 
     //Create edge instances
     edgeId1 = "node1:node2:edge1";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
new file mode 100644
index 0000000..2542f5e
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class JobExecutionPlanDagFactoryTest {
+  private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
+  private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME;
+  private SpecExecutor specExecutor;
+  private List<JobTemplate> jobTemplates;
+
+  @BeforeClass
+  public void setUp() throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException {
+    // Create a FSFlowCatalog instance
+    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    Properties properties = new Properties();
+    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+    FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
+    this.jobTemplates = flowTemplate.getJobTemplates();
+
+    //Create a spec executor instance
+    properties = new Properties();
+    properties.put("specStore.fs.dir", "/tmp/testSpecStoreDir");
+    properties.put("specExecInstance.capabilities", "source:destination");
+    Config specExecutorConfig = ConfigUtils.propertiesToConfig(properties);
+    this.specExecutor = new InMemorySpecExecutor(specExecutorConfig);
+  }
+
+  @Test
+  public void testCreateDag() throws Exception {
+    //Create a list of JobExecutionPlans
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    for (JobTemplate jobTemplate: this.jobTemplates) {
+      String jobSpecUri = Files.getNameWithoutExtension(new Path(jobTemplate.getUri()).getName());
+      jobExecutionPlans.add(new JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(jobTemplate.getRawTemplateConfig()).
+          withVersion("1").withTemplate(jobTemplate.getUri()).build(), specExecutor));
+    }
+
+    //Create a DAG from job execution plans.
+    Dag<JobExecutionPlan> dag = new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+
+    //Test DAG properties
+    Assert.assertEquals(dag.getStartNodes().size(), 1);
+    Assert.assertEquals(dag.getEndNodes().size(), 1);
+    Assert.assertEquals(dag.getNodes().size(), 4);
+    String startNodeName = new Path(dag.getStartNodes().get(0).getValue().getJobSpec().getUri()).getName();
+    Assert.assertEquals(startNodeName, "job1");
+    String templateUri = new Path(dag.getStartNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName();
+    Assert.assertEquals(templateUri, "job1.job");
+    String endNodeName = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getUri()).getName();
+    Assert.assertEquals(endNodeName, "job4");
+    templateUri = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName();
+    Assert.assertEquals(templateUri, "job4.job");
+
+    Dag.DagNode<JobExecutionPlan> startNode = dag.getStartNodes().get(0);
+    List<Dag.DagNode<JobExecutionPlan>> nextNodes = dag.getChildren(startNode);
+    Set<String> nodeSet = new HashSet<>();
+    for (Dag.DagNode<JobExecutionPlan> node: nextNodes) {
+      nodeSet.add(new Path(node.getValue().getJobSpec().getUri()).getName());
+      Dag.DagNode<JobExecutionPlan> nextNode = dag.getChildren(node).get(0);
+      Assert.assertEquals(new Path(nextNode.getValue().getJobSpec().getUri()).getName(), "job4");
+    }
+    Assert.assertTrue(nodeSet.contains("job2"));
+    Assert.assertTrue(nodeSet.contains("job3"));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
deleted file mode 100644
index 58d879e..0000000
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.template;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-
-public class JobTemplateDagFactoryTest {
-  private static final String TEST_TEMPLATE_NAME = "test-template";
-  private static final String TEST_FLOW_CONF_FILE_NAME="flow.conf";
-  private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME + "/" + TEST_FLOW_CONF_FILE_NAME;
-  FSFlowCatalog catalog;
-
-  @BeforeClass
-  public void setUp()
-      throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException {
-    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-
-    // Create a FSFlowCatalog instance
-    Properties properties = new Properties();
-    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
-    Config config = ConfigFactory.parseProperties(properties);
-    Config templateCatalogCfg = config
-        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    this.catalog = new FSFlowCatalog(templateCatalogCfg);
-  }
-
-  @Test
-  public void testCreateDagFromJobTemplates() throws Exception {
-    FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
-    List<JobTemplate> jobTemplates = flowTemplate.getJobTemplates();
-
-    //Create a DAG from job templates.
-    Dag<JobTemplate> jobTemplateDag = JobTemplateDagFactory.createDagFromJobTemplates(jobTemplates);
-
-    //Test DAG properties
-    Assert.assertEquals(jobTemplateDag.getStartNodes().size(), 1);
-    Assert.assertEquals(jobTemplateDag.getEndNodes().size(), 1);
-    Assert.assertEquals(jobTemplateDag.getNodes().size(), 4);
-    String startNodeName = new Path(jobTemplateDag.getStartNodes().get(0).getValue().getUri()).getName();
-    Assert.assertEquals(startNodeName, "job1.conf");
-    String endNodeName = new Path(jobTemplateDag.getEndNodes().get(0).getValue().getUri()).getName();
-    Assert.assertEquals(endNodeName, "job4.conf");
-
-    Dag.DagNode<JobTemplate> startNode = jobTemplateDag.getStartNodes().get(0);
-    List<Dag.DagNode<JobTemplate>> nextNodes = jobTemplateDag.getChildren(startNode);
-    Set<String> nodeSet = new HashSet<>();
-    for(Dag.DagNode<JobTemplate> node: nextNodes) {
-      nodeSet.add(new Path(node.getValue().getUri()).getName());
-      Dag.DagNode<JobTemplate> nextNode = jobTemplateDag.getChildren(node).get(0);
-      Assert.assertEquals(new Path(nextNode.getValue().getUri()).getName(), "job4.conf");
-    }
-    Assert.assertTrue(nodeSet.contains("job2.conf"));
-    Assert.assertTrue(nodeSet.contains("job3.conf"));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
index b20606f..3c8ebd3 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
@@ -23,19 +23,20 @@ import java.util.Properties;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
-import org.apache.gobblin.service.modules.dataset.HdfsDatasetDescriptor;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
 import org.testng.collections.Lists;
 
@@ -43,9 +44,8 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class FSFlowCatalogTest {
-  private static final String TEST_TEMPLATE_NAME = "test-template";
-  private static final String TEST_FLOW_CONF_FILE_NAME="flow.conf";
-  private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME + "/" + TEST_FLOW_CONF_FILE_NAME;
+  private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
+  private static final String TEST_TEMPLATE_DIR_URI = "FS:///" + TEST_TEMPLATE_NAME;
 
   @Test
   public void testGetFlowTemplate() throws Exception {
@@ -58,50 +58,45 @@ public class FSFlowCatalogTest {
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
     FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
-    FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
+    FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_DIR_URI));
 
     //Basic sanity check for the FlowTemplate
-    Dag<JobTemplate> jobTemplateDag = flowTemplate.getDag();
-    List<Dag.DagNode<JobTemplate>> dagNodes = jobTemplateDag.getNodes();
-    Assert.assertTrue(dagNodes.size() == 4);
-    Assert.assertEquals(jobTemplateDag.getStartNodes().size(), 1);
-    Assert.assertEquals(jobTemplateDag.getEndNodes().size(), 1);
-    Dag.DagNode<JobTemplate> dagNode = jobTemplateDag.getStartNodes().get(0);
-    URI startNodeUri = this.getClass().getClassLoader().getResource("template_catalog/test-template/jobs/job1.conf").toURI();
-    URI endNodeUri = this.getClass().getClassLoader().getResource("template_catalog/test-template/jobs/job4.conf").toURI();
-    Assert.assertEquals(jobTemplateDag.getStartNodes().get(0).getValue().getUri(), startNodeUri);
-    Assert.assertEquals(jobTemplateDag.getEndNodes().get(0).getValue().getUri(), endNodeUri);
 
     List<JobTemplate> jobTemplates = flowTemplate.getJobTemplates();
     Assert.assertEquals(jobTemplates.size(), 4);
-    for(int i=0; i<4; i++) {
+    for (int i = 0; i < 4; i++) {
       String uri = new Path(jobTemplates.get(i).getUri()).getName().split("\\.")[0];
       String templateId = uri.substring(uri.length() - 1);
-      for(int j=0; j<2; j++) {
+      for (int j = 0; j < 2; j++) {
         Config jobTemplateConfig = jobTemplates.get(i).getRawTemplateConfig();
-        String suffix = templateId + Integer.toString(j+1);
+        String suffix = templateId + Integer.toString(j + 1);
         Assert.assertEquals(jobTemplateConfig.getString("key" + suffix), "val" + suffix);
       }
     }
 
-    List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = flowTemplate.getInputOutputDatasetDescriptors();
+    Config flowConfig = ConfigFactory.empty().withValue("team.name", ConfigValueFactory.fromAnyRef("test-team"))
+        .withValue("dataset.name", ConfigValueFactory.fromAnyRef("test-dataset"));
+
+    List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = flowTemplate.getResolvingDatasetDescriptors(flowConfig);
     Assert.assertTrue(inputOutputDescriptors.size() == 2);
     List<String> dirs = Lists.newArrayList("inbound", "outbound");
-    for(int i=0; i<2; i++) {
-      for (int j=0; j<2; j++) {
-        HdfsDatasetDescriptor datasetDescriptor;
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        FSDatasetDescriptor datasetDescriptor;
         if (j == 0) {
-          datasetDescriptor = (HdfsDatasetDescriptor) inputOutputDescriptors.get(i).getLeft();
+          datasetDescriptor = (FSDatasetDescriptor) inputOutputDescriptors.get(i).getLeft();
         } else {
-          datasetDescriptor = (HdfsDatasetDescriptor) inputOutputDescriptors.get(i).getRight();
+          datasetDescriptor = (FSDatasetDescriptor) inputOutputDescriptors.get(i).getRight();
         }
         Assert.assertEquals(datasetDescriptor.getPlatform(), "hdfs");
-        Assert.assertEquals(datasetDescriptor.getFormat(), "avro");
-        Assert.assertEquals(datasetDescriptor.getPath(), "/data/" + dirs.get(i) + "/<TEAM_NAME>/<DATASET_NAME>");
+        Assert.assertEquals(datasetDescriptor.getFormatConfig().getFormat(), "avro");
+        Assert.assertEquals(datasetDescriptor.getPath(), "/data/" + dirs.get(i) + "/test-team/test-dataset");
       }
     }
     Config flowTemplateConfig = flowTemplate.getRawTemplateConfig();
-    Assert.assertEquals(flowTemplateConfig.getString("gobblin.flow.dataset.descriptor.input.0.class"), "org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor");
-    Assert.assertEquals(flowTemplateConfig.getString("gobblin.flow.dataset.descriptor.output.0.class"), "org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor");
+    Assert.assertEquals(flowTemplateConfig.getString(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX + ".0."
+        + DatasetDescriptorConfigKeys.CLASS_KEY), FSDatasetDescriptor.class.getCanonicalName());
+    Assert.assertEquals(flowTemplateConfig.getString(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX
+        + ".0." + DatasetDescriptorConfigKeys.CLASS_KEY), FSDatasetDescriptor.class.getCanonicalName());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flow/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flow/flow.conf b/gobblin-service/src/test/resources/flow/flow.conf
new file mode 100644
index 0000000..f818df6
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow.conf
@@ -0,0 +1,24 @@
+team.name=testTeam
+dataset.name=testDataset
+user.to.proxy=testUser
+adls.user.to.proxy=adlsTestUser
+adls.oauth2.client.id=1234
+adls.ouath2.credential=credential
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.input.dataset.descriptor.format=avro
+gobblin.flow.input.dataset.descriptor.codec=NONE
+gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE
+
+#Output dataset - compressed and encrypted
+gobblin.flow.output.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.output.dataset.descriptor.platform=adls
+gobblin.flow.output.dataset.descriptor.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.output.dataset.descriptor.format=json
+gobblin.flow.output.dataset.descriptor.codec=gzip
+gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
new file mode 100644
index 0000000..a219e4f
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=ADLS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.AdlsDataNode
+data.node.fs.uri=adl://azuredatalakestore.net/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
new file mode 100644
index 0000000..cad5e03
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn01.grid.linkedin.com:8888/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
new file mode 100644
index 0000000..eeb7980
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-2
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn02.grid.linkedin.com:8888/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
new file mode 100644
index 0000000..61135ba
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-3
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn03.grid.linkedin.com:8888/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
new file mode 100644
index 0000000..a772f1c
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-4
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn04.grid.linkedin.com:8888/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
new file mode 100644
index 0000000..6683221
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=LocalFS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.LocalFSDataNode
+data.node.fs.uri=file:///
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
new file mode 100644
index 0000000..bcf6d44
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-1
+flow.edge.destination=HDFS-1
+flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
new file mode 100644
index 0000000..99d1ed7
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
@@ -0,0 +1,10 @@
+flow.edge.source=HDFS-1
+flow.edge.destination=HDFS-3
+flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
new file mode 100644
index 0000000..537cbfa
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-2
+flow.edge.destination=HDFS-2
+flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
new file mode 100644
index 0000000..6ec2ea5
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-2
+flow.edge.destination=HDFS-4
+flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
new file mode 100644
index 0000000..ed6e899
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
@@ -0,0 +1,13 @@
+flow.edge.source=HDFS-3
+flow.edge.destination=ADLS-1
+flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+
+# Proxy config
+flow.edge.proxy.host=adl-proxy.linkedin.com
+flow.edge.proxy.port=1234
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
new file mode 100644
index 0000000..eae2767
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
@@ -0,0 +1,13 @@
+flow.edge.source=HDFS-4
+flow.edge.destination=ADLS-1
+flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+
+# Proxy config
+flow.edge.proxy.host=adl-proxy.linkedin.com
+flow.edge.proxy.port=1234

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
new file mode 100644
index 0000000..268b67f
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
@@ -0,0 +1,9 @@
+flow.edge.source=LocalFS-1
+flow.edge.destination=HDFS-1
+flow.edge.id=LocalFS-1:HDFS-1:localToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=fs:///
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
+flow.edge.specExecutors.0.specExecInstance.job.type=java

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
new file mode 100644
index 0000000..bc67810
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
@@ -0,0 +1,9 @@
+flow.edge.source=LocalFS-1
+flow.edge.destination=HDFS-2
+flow.edge.id=LocalFS-1:HDFS-2:localToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=fs:///
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
+flow.edge.specExecutors.0.specExecInstance.job.type=java
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
deleted file mode 100644
index 43fa9a3..0000000
--- a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.core;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SystemUtils;
-import org.eclipse.jgit.api.Git;
-import org.eclipse.jgit.api.errors.GitAPIException;
-import org.eclipse.jgit.dircache.DirCache;
-import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.lib.RepositoryCache;
-import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.util.FS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-
-
-public class GitFlowGraphMonitorTest {
-  private static final Logger logger = LoggerFactory.getLogger(GitFlowGraphMonitor.class);
-  private Repository remoteRepo;
-  private Git gitForPush;
-  private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir";
-  private final File remoteDir = new File(TEST_DIR + "/remote");
-  private final File cloneDir = new File(TEST_DIR + "/clone");
-  private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
-  private static final String NODE_1_FILE = "node1.properties";
-  private final File node1Dir = new File(flowGraphDir, "node1");
-  private final File node1File = new File(node1Dir, NODE_1_FILE);
-  private static final String NODE_2_FILE = "node2.properties";
-  private final File node2Dir = new File(flowGraphDir, "node2");
-  private final File node2File = new File(node2Dir, NODE_2_FILE);
-  private final File edge1Dir = new File(node1Dir, "node2");
-  private final File edge1File = new File(edge1Dir, "edge1.properties");
-
-  private RefSpec masterRefSpec = new RefSpec("master");
-  private FSFlowCatalog flowCatalog;
-  private Config config;
-  private BaseFlowGraph flowGraph;
-  private GitFlowGraphMonitor gitFlowGraphMonitor;
-
-  @BeforeClass
-  public void setUp() throws Exception {
-    cleanUpDir(TEST_DIR);
-
-    // Create a bare repository
-    RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED);
-    this.remoteRepo = fileKey.open(false);
-    this.remoteRepo.create(true);
-
-    this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
-
-    // push an empty commit as a base for detecting changes
-    this.gitForPush.commit().setMessage("First commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.config = ConfigBuilder.create()
-        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
-            + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
-        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph")
-        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
-        .build();
-
-    // Create a FSFlowCatalog instance
-    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-    Properties properties = new Properties();
-    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
-    Config config = ConfigFactory.parseProperties(properties);
-    Config templateCatalogCfg = config
-        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
-
-    //Create a FlowGraph instance with defaults
-    this.flowGraph = new BaseFlowGraph();
-
-    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph);
-    this.gitFlowGraphMonitor.setActive(true);
-  }
-
-  private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, String paramValue)
-      throws IOException, GitAPIException {
-    // push a new node file
-    nodeDir.mkdirs();
-    nodeFile.createNewFile();
-    Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8);
-
-    // add, commit, push node
-    this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call();
-    this.gitForPush.commit().setMessage("Node commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.gitFlowGraphMonitor.processGitConfigChanges();
-
-    //Check if node1 has been added to the FlowGraph
-    DataNode dataNode = this.flowGraph.getNode(nodeId);
-    Assert.assertEquals(dataNode.getId(), nodeId);
-    Assert.assertTrue(dataNode.isActive());
-    Assert.assertEquals(dataNode.getProps().getString("param1"), paramValue);
-  }
-
-  @Test
-  public void testAddNode()
-      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
-    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1");
-    testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2");
-  }
-
-  @Test (dependsOnMethods = "testAddNode")
-  public void testAddEdge()
-      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
-    // push a new node file
-    this.edge1Dir.mkdirs();
-    this.edge1File.createNewFile();
-
-    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8);
-
-    // add, commit, push
-    this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
-    this.gitForPush.commit().setMessage("Edge commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.gitFlowGraphMonitor.processGitConfigChanges();
-
-    //Check if edge1 has been added to the FlowGraph
-    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
-    Assert.assertEquals(edgeSet.size(), 1);
-    FlowEdge flowEdge = edgeSet.iterator().next();
-    Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
-    Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
-  }
-
-  @Test (dependsOnMethods = "testAddNode")
-  public void testUpdateEdge()
-      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
-    //Update edge1 file
-    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n"
-        + "key1=value1\n", edge1File, Charsets.UTF_8);
-
-    // add, commit, push
-    this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
-    this.gitForPush.commit().setMessage("Edge commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.gitFlowGraphMonitor.processGitConfigChanges();
-
-    //Check if new edge1 has been added to the FlowGraph
-    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
-    Assert.assertEquals(edgeSet.size(), 1);
-    FlowEdge flowEdge = edgeSet.iterator().next();
-    Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
-    Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
-    Assert.assertEquals(flowEdge.getProps().getString("key1"), "value1");
-  }
-
-  @Test (dependsOnMethods = "testUpdateEdge")
-  public void testUpdateNode()
-      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
-    //Update param1 value in node1 and check if updated node is added to the graph
-    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3");
-  }
-
-
-  @Test (dependsOnMethods = "testUpdateNode")
-  public void testRemoveEdge() throws GitAPIException, IOException {
-    // delete a config file
-    edge1File.delete();
-
-    //Node1 has 1 edge before delete
-    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
-    Assert.assertEquals(edgeSet.size(), 1);
-
-    // delete, commit, push
-    DirCache ac = this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
-        this.edge1Dir.getName(), this.edge1File.getName())).call();
-    RevCommit cc = this.gitForPush.commit().setMessage("Edge remove commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.gitFlowGraphMonitor.processGitConfigChanges();
-
-    //Check if edge1 has been deleted from the graph
-    edgeSet = this.flowGraph.getEdges("node1");
-    Assert.assertTrue(edgeSet.size() == 0);
-  }
-
-  @Test (dependsOnMethods = "testRemoveEdge")
-  public void testRemoveNode() throws GitAPIException, IOException {
-    //delete node file
-    node1File.delete();
-
-    //node1 is present in the graph before delete
-    DataNode node1 = this.flowGraph.getNode("node1");
-    Assert.assertNotNull(node1);
-
-    // delete, commit, push
-    DirCache ac = this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call();
-    RevCommit cc = this.gitForPush.commit().setMessage("Node remove commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.gitFlowGraphMonitor.processGitConfigChanges();
-
-    //Check if node1 has been deleted from the graph
-    node1 = this.flowGraph.getNode("node1");
-    Assert.assertNull(node1);
-  }
-
-
-  private void cleanUpDir(String dir) {
-    File specStoreDir = new File(dir);
-
-    // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
-    for (int i = 0; i < 5; i++) {
-      try {
-        if (specStoreDir.exists()) {
-          FileUtils.deleteDirectory(specStoreDir);
-        }
-        // if delete succeeded then break out of loop
-        break;
-      } catch (IOException e) {
-        logger.warn("Cleanup delete directory failed for directory: " + dir, e);
-      }
-    }
-  }
-
-  private String formNodeFilePath(String groupDir, String fileName) {
-    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
-  }
-
-  private String formEdgeFilePath(String parentDir, String groupDir, String fileName) {
-    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
-  }
-
-  @AfterClass
-  public void tearDown() throws Exception {
-    cleanUpDir(TEST_DIR);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
deleted file mode 100644
index 9dd51a0..0000000
--- a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-import java.util.Properties;
-
-import org.apache.gobblin.util.ConfigUtils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class BaseFlowEdgeFactoryTest {
-  @Test
-  public void testCreateFlowEdge() throws Exception {
-    Properties properties = new Properties();
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"node1");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "node2");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "FS:///test-template/flow.conf");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2");
-
-    FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory();
-
-    Properties props = new Properties();
-    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-    props.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
-    Config config = ConfigFactory.parseProperties(props);
-    Config templateCatalogCfg = config
-        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
-    Config edgeProps = ConfigUtils.propertiesToConfig(properties);
-    FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog);
-    Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
-    Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),"s1:d1");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),"/tmp2");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),"s2:d2");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(),"InMemorySpecExecutor");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(),"InMemorySpecExecutor");
-  }
-}
\ No newline at end of file