You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/09 16:40:48 UTC

[2/2] incubator-gobblin git commit: [GOBBLIN-554] Change signature of SpecCompiler#compileFlow() to return a DAG of JobSpecs instead of a HashMap.

[GOBBLIN-554] Change signature of SpecCompiler#compileFlow() to return a DAG of JobSpecs instead of a HashMap.

Closes #2415 from sv2000/dag1


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ef26d287
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ef26d287
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ef26d287

Branch: refs/heads/master
Commit: ef26d287d95edcc5208daed651e8ef8e09efa5d0
Parents: 25530f0
Author: suvasude <su...@linkedin.biz>
Authored: Thu Aug 9 09:40:43 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Aug 9 09:40:43 2018 -0700

----------------------------------------------------------------------
 .../gobblin/runtime/api/SpecCompiler.java       |  44 --
 .../modules/flow/BaseFlowToJobSpecCompiler.java |   6 +-
 .../flow/IdentityFlowToJobSpecCompiler.java     |  71 ++--
 .../modules/flow/MultiHopFlowCompiler.java      |  39 +-
 .../flow/MultiHopsFlowToJobSpecCompiler.java    |  13 +-
 .../service/modules/flow/SpecCompiler.java      |  51 +++
 .../modules/orchestration/Orchestrator.java     |  31 +-
 .../core/IdentityFlowToJobSpecCompilerTest.java |  40 +-
 .../modules/flow/FlowGraphPathFinderTest.java   | 417 ------------------
 .../modules/flow/MultiHopFlowCompilerTest.java  | 419 +++++++++++++++++++
 .../hdfs-1-to-hdfs-1-encrypt.properties         |   2 +-
 .../flowedges/hdfs-1-to-hdfs-3.properties       |   2 +-
 .../hdfs-2-to-hdfs-2-encrypt.properties         |   2 +-
 .../flowedges/hdfs-2-to-hdfs-4.properties       |   2 +-
 .../flowedges/hdfs-3-to-adls-1.properties       |   2 +-
 .../flowedges/hdfs-4-to-adls-1.properties       |   2 +-
 16 files changed, 593 insertions(+), 550 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java
deleted file mode 100644
index aceb5dd..0000000
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.api;
-
-import java.net.URI;
-import java.util.Map;
-
-import org.apache.gobblin.instrumented.Instrumentable;
-
-/***
- * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
- * and the mapping to {@link SpecExecutor} that they can be run on.
- */
-public interface SpecCompiler extends SpecCatalogListener, Instrumentable {
-  /***
-   * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
-   * and the mapping to {@link SpecExecutor} that they can be run on.
-   * @param spec {@link Spec} to compile.
-   * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}.
-   */
-  Map<Spec, SpecExecutor> compileFlow(Spec spec);
-
-  /***
-   * Map of {@link Spec} URI and {@link TopologySpec} the {@link SpecCompiler}
-   * is aware about.
-   * @return Map of {@link Spec} URI and {@link TopologySpec}
-   */
-  Map<URI, TopologySpec> getTopologySpecMap();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index e4c9ae3..6604676 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -40,20 +40,20 @@ import org.slf4j.LoggerFactory;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecCompiler;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.ServiceMetricNames;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
 
@@ -207,7 +207,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
     return this.topologySpecMap;
   }
 
-  public abstract Map<Spec, SpecExecutor> compileFlow(Spec spec);
+  public abstract Dag<JobExecutionPlan> compileFlow(Spec spec);
 
   /**
    * Naive implementation of generating jobSpec, which fetch the first available template,

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
index 3fb20a2..fa843c5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
@@ -17,27 +17,30 @@
 
 package org.apache.gobblin.service.modules.flow;
 
-
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-
 import java.util.concurrent.TimeUnit;
+
 import org.slf4j.Logger;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.ServiceNode;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 
 
 /***
@@ -64,12 +67,11 @@ public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
   }
 
   @Override
-  public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+  public Dag<JobExecutionPlan> compileFlow(Spec spec) {
     Preconditions.checkNotNull(spec);
     Preconditions.checkArgument(spec instanceof FlowSpec, "IdentityFlowToJobSpecCompiler only converts FlowSpec to JobSpec");
 
     long startTime = System.nanoTime();
-    Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap();
 
     FlowSpec flowSpec = (FlowSpec) spec;
     String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
@@ -77,33 +79,44 @@ public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
     log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
 
     JobSpec jobSpec = jobSpecGenerator(flowSpec);
+    Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
+    Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+    List<JobExecutionPlan> jobExecutionPlans;
+    try {
+      jobExecutionPlans = getJobExecutionPlans(source, destination, jobSpec);
+    } catch (InterruptedException | ExecutionException e) {
+      Instrumented.markMeter(this.flowCompilationFailedMeter);
+      throw new RuntimeException("Cannot determine topology capabilities", e);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+
+  private List<JobExecutionPlan> getJobExecutionPlans(String source, String destination, JobSpec jobSpec)
+      throws ExecutionException, InterruptedException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
 
     for (TopologySpec topologySpec : topologySpecMap.values()) {
-      try {
-        Map<ServiceNode, ServiceNode> capabilities = (Map<ServiceNode, ServiceNode>) topologySpec.getSpecExecutor().getCapabilities().get();
-        for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) {
-          log.info(String.format("Evaluating current JobSpec: %s against TopologySpec: %s with "
-                  + "capability of source: %s and destination: %s ", jobSpec.getUri(),
-              topologySpec.getUri(), capability.getKey(), capability.getValue()));
-          if (source.equals(capability.getKey().getNodeName()) && destination.equals(capability.getValue().getNodeName())) {
-            specExecutorMap.put(jobSpec, topologySpec.getSpecExecutor());
-            log.info(String.format("Current JobSpec: %s is executable on TopologySpec: %s. Added TopologySpec as candidate.",
-                jobSpec.getUri(), topologySpec.getUri()));
-
-            log.info("Since we found a candidate executor, we will not try to compute more. "
-                + "(Intended limitation for IdentityFlowToJobSpecCompiler)");
-            return specExecutorMap;
-          }
+      Map<ServiceNode, ServiceNode> capabilities =
+          (Map<ServiceNode, ServiceNode>) topologySpec.getSpecExecutor().getCapabilities().get();
+      for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) {
+        log.info(String.format("Evaluating current JobSpec: %s against TopologySpec: %s with "
+                + "capability of source: %s and destination: %s ", jobSpec.getUri(), topologySpec.getUri(),
+            capability.getKey(), capability.getValue()));
+        if (source.equals(capability.getKey().getNodeName()) && destination
+            .equals(capability.getValue().getNodeName())) {
+          JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec, topologySpec.getSpecExecutor());
+          log.info(String
+              .format("Current JobSpec: %s is executable on TopologySpec: %s. Added TopologySpec as candidate.",
+                  jobSpec.getUri(), topologySpec.getUri()));
+
+          log.info("Since we found a candidate executor, we will not try to compute more. "
+              + "(Intended limitation for IdentityFlowToJobSpecCompiler)");
+          jobExecutionPlans.add(jobExecutionPlan);
+          return jobExecutionPlans;
         }
-      } catch (InterruptedException | ExecutionException e) {
-        Instrumented.markMeter(this.flowCompilationFailedMeter);
-        throw new RuntimeException("Cannot determine topology capabilities", e);
       }
     }
-    Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
-    Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
-    return specExecutorMap;
+    return jobExecutionPlans;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 8b14b10..a281aa4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -41,12 +42,14 @@ import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 
 
@@ -90,53 +93,50 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph);
   }
 
+  @VisibleForTesting
+  MultiHopFlowCompiler(Config config, FlowGraph flowGraph) {
+    super(config, Optional.absent(), true);
+    this.flowGraph = flowGraph;
+  }
+
   public void setActive(boolean active) {
     this.active = active;
     this.gitFlowGraphMonitor.setActive(active);
   }
 
   /**
-   * TODO: We need to change signature of compileFlow to return a Dag instead of a HashMap to capture
-   * job dependencies.
-   * @param spec
-   * @return
+   * j
+   * @param spec an instance of {@link FlowSpec}.
+   * @return A DAG of {@link JobExecutionPlan}s, which encapsulates the compiled {@link org.apache.gobblin.runtime.api.JobSpec}s
+   * together with the {@link SpecExecutor} where the job can be executed.
    */
   @Override
-  public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+  public Dag<JobExecutionPlan> compileFlow(Spec spec) {
     Preconditions.checkNotNull(spec);
-    Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowToJobSpecCompiler only accepts FlowSpecs");
+    Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowCompiler only accepts FlowSpecs");
 
     long startTime = System.nanoTime();
-    Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap();
 
     FlowSpec flowSpec = (FlowSpec) spec;
     String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
     String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
     log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
 
+    Dag<JobExecutionPlan> jobExecutionPlanDag;
     FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec);
     try {
       //Compute the path from source to destination.
       FlowGraphPath flowGraphPath = pathFinder.findPath();
 
       //Convert the path into a Dag of JobExecutionPlans.
-      Dag<JobExecutionPlan> jobExecutionPlanDag;
       if (flowGraphPath != null) {
         jobExecutionPlanDag = flowGraphPath.asDag();
       } else {
         Instrumented.markMeter(this.flowCompilationFailedMeter);
         log.info(String.format("No path found from source: %s and destination: %s", source, destination));
-        return null;
+        return new JobExecutionPlanDagFactory().createDag(new ArrayList<>());
       }
-
-      //TODO: Just a dummy return value for now. compileFlow() signature needs to be modified to return a Dag instead
-      // of a Map. For now just add all specs into the map.
-      for (Dag.DagNode<JobExecutionPlan> node: jobExecutionPlanDag.getNodes()) {
-        JobExecutionPlan jobExecutionPlan = node.getValue();
-        specExecutorMap.put(jobExecutionPlan.getJobSpec(), jobExecutionPlan.getSpecExecutor());
-      }
-    } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | IOException
-        | URISyntaxException e) {
+    } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | IOException e) {
       Instrumented.markMeter(this.flowCompilationFailedMeter);
       log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e);
       return null;
@@ -144,7 +144,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
     Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
 
-    return specExecutorMap;
+    return jobExecutionPlanDag;
   }
 
   @Override
@@ -153,5 +153,4 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     return;
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
index 544ca42..236f927 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.flow;
 import com.google.common.base.Splitter;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -35,7 +36,10 @@ import com.typesafe.config.ConfigValueFactory;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.policy.ServicePolicy;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.jgrapht.graph.DirectedWeightedMultigraph;
@@ -132,11 +136,16 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
   }
 
   @Override
-  public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+  public Dag<JobExecutionPlan> compileFlow(Spec spec) {
     // A Map from JobSpec to SpexExecutor, as the output of Flow Compiler.
     Map<Spec, SpecExecutor> specExecutorInstanceMap = Maps.newLinkedHashMap();
     findPath(specExecutorInstanceMap, spec);
-    return specExecutorInstanceMap;
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    for (Map.Entry<Spec, SpecExecutor> entry: specExecutorInstanceMap.entrySet()) {
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan((JobSpec) entry.getKey(), entry.getValue());
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
new file mode 100644
index 0000000..3ce3b70
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.gobblin.instrumented.Instrumentable;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/***
+ * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
+ * and the mapping to {@link SpecExecutor} that they can be run on.
+ */
+public interface SpecCompiler extends SpecCatalogListener, Instrumentable {
+  /***
+   * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
+   * and the mapping to {@link SpecExecutor} that they can be run on.
+   * @param spec {@link Spec} to compile.
+   * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}.
+   */
+  Dag<JobExecutionPlan> compileFlow(Spec spec);
+
+  /***
+   * Map of {@link Spec} URI and {@link TopologySpec} the {@link SpecCompiler}
+   * is aware about.
+   * @return Map of {@link Spec} URI and {@link TopologySpec}
+   */
+  Map<URI, TopologySpec> getTopologySpecMap();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 1b3907d..4959b1a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -38,7 +38,7 @@ import org.apache.gobblin.instrumented.Instrumentable;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.SpecCompiler;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
@@ -46,6 +46,8 @@ import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.slf4j.LoggerFactory;
@@ -186,25 +188,27 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
 
     long startTime = System.nanoTime();
     if (spec instanceof FlowSpec) {
-      Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec);
+      Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
 
-      if (specExecutorInstanceMap.isEmpty()) {
+      if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
         _log.warn("Cannot determine an executor to run on for Spec: " + spec);
         return;
       }
 
       // Schedule all compiled JobSpecs on their respective Executor
-      for (Map.Entry<Spec, SpecExecutor> specsToExecute : specExecutorInstanceMap.entrySet()) {
+      for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+
         // Run this spec on selected executor
         SpecProducer producer = null;
         try {
-          producer = specsToExecute.getValue().getProducer().get();
-          Spec jobSpec = specsToExecute.getKey();
+          producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
+          Spec jobSpec = jobExecutionPlan.getJobSpec();
 
           _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer));
           producer.addSpec(jobSpec);
         } catch(Exception e) {
-          _log.error("Cannot successfully setup spec: " + specsToExecute.getKey() + " on executor: " + producer +
+          _log.error("Cannot successfully setup spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer +
               " for flow: " + spec, e);
         }
       }
@@ -221,25 +225,26 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all executions
     if (spec instanceof FlowSpec) {
-      Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec);
+      Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
 
-      if (specExecutorInstanceMap.isEmpty()) {
+      if (jobExecutionPlanDag.isEmpty()) {
         _log.warn("Cannot determine an executor to delete Spec: " + spec);
         return;
       }
 
       // Delete all compiled JobSpecs on their respective Executor
-      for (Map.Entry<Spec, SpecExecutor> specsToDelete : specExecutorInstanceMap.entrySet()) {
+      for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = dagNode.getValue();
         // Delete this spec on selected executor
         SpecProducer producer = null;
         try {
-          producer = specsToDelete.getValue().getProducer().get();
-          Spec jobSpec = specsToDelete.getKey();
+          producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
+          Spec jobSpec = jobExecutionPlan.getJobSpec();
 
           _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
           producer.deleteSpec(jobSpec.getUri(), headers);
         } catch(Exception e) {
-          _log.error("Cannot successfully delete spec: " + specsToDelete.getKey() + " on executor: " + producer +
+          _log.error("Cannot successfully delete spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer +
               " for flow: " + spec, e);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
index 2dbe790..c1134a8 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
@@ -16,13 +16,11 @@
  */
 package org.apache.gobblin.service.modules.core;
 
-
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
@@ -40,14 +38,15 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 
 public class IdentityFlowToJobSpecCompilerTest {
   private static final Logger logger = LoggerFactory.getLogger(IdentityFlowToJobSpecCompilerTest.class);
@@ -187,14 +186,15 @@ public class IdentityFlowToJobSpecCompilerTest {
     FlowSpec flowSpec = initFlowSpec();
 
     // Run compiler on flowSpec
-    Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec);
+    Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithTemplateCalague.compileFlow(flowSpec);
 
     // Assert pre-requisites
-    Assert.assertNotNull(specExecutorMapping, "Expected non null mapping.");
-    Assert.assertTrue(specExecutorMapping.size() == 1, "Exepected 1 executor for FlowSpec.");
+    Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag.");
+    Assert.assertTrue(jobExecutionPlanDag.getNodes().size() == 1, "Exepected 1 executor for FlowSpec.");
 
     // Assert FlowSpec compilation
-    Spec spec = specExecutorMapping.keySet().iterator().next();
+    Dag.DagNode<JobExecutionPlan> dagNode = jobExecutionPlanDag.getStartNodes().get(0);
+    Spec spec = dagNode.getValue().getJobSpec();
     Assert.assertTrue(spec instanceof JobSpec, "Expected JobSpec compiled from FlowSpec.");
 
     // Assert JobSpec properties
@@ -209,6 +209,9 @@ public class IdentityFlowToJobSpecCompilerTest {
     Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME);
     Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_FLOW_GROUP);
     Assert.assertTrue(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+
+    //Assert the start node has no children.
+    Assert.assertNull(jobExecutionPlanDag.getChildren(dagNode));
   }
 
   @Test
@@ -216,14 +219,16 @@ public class IdentityFlowToJobSpecCompilerTest {
     FlowSpec flowSpec = initFlowSpec();
 
     // Run compiler on flowSpec
-    Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithoutTemplateCalague.compileFlow(flowSpec);
+    Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithoutTemplateCalague.compileFlow(flowSpec);
 
     // Assert pre-requisites
-    Assert.assertNotNull(specExecutorMapping, "Expected non null mapping.");
-    Assert.assertTrue(specExecutorMapping.size() == 1, "Exepected 1 executor for FlowSpec.");
+    Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag.");
+    Assert.assertTrue(jobExecutionPlanDag.getNodes().size() == 1, "Exepected 1 executor for FlowSpec.");
 
     // Assert FlowSpec compilation
-    Spec spec = specExecutorMapping.keySet().iterator().next();
+    Assert.assertEquals(jobExecutionPlanDag.getStartNodes().size(), 1);
+    Dag.DagNode<JobExecutionPlan> dagNode = jobExecutionPlanDag.getStartNodes().get(0);
+    Spec spec = dagNode.getValue().getJobSpec();
     Assert.assertTrue(spec instanceof JobSpec, "Expected JobSpec compiled from FlowSpec.");
 
     // Assert JobSpec properties
@@ -238,6 +243,9 @@ public class IdentityFlowToJobSpecCompilerTest {
     Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME);
     Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_FLOW_GROUP);
     Assert.assertTrue(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+
+    //Assert the start node has no children.
+    Assert.assertNull(jobExecutionPlanDag.getChildren(dagNode));
   }
 
   @Test
@@ -245,10 +253,10 @@ public class IdentityFlowToJobSpecCompilerTest {
     FlowSpec flowSpec = initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, "unsupportedSource", "unsupportedSink");
 
     // Run compiler on flowSpec
-    Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec);
+    Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithTemplateCalague.compileFlow(flowSpec);
 
     // Assert pre-requisites
-    Assert.assertNotNull(specExecutorMapping, "Expected non null mapping.");
-    Assert.assertTrue(specExecutorMapping.size() == 0, "Exepected 1 executor for FlowSpec.");
+    Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag.");
+    Assert.assertTrue(jobExecutionPlanDag.getNodes().size() == 0, "Exepected 1 executor for FlowSpec.");
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
deleted file mode 100644
index 5d0500c..0000000
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flow;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Charsets;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import com.typesafe.config.ConfigSyntax;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-import org.apache.gobblin.util.CompletedFuture;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-
-
-@Slf4j
-public class FlowGraphPathFinderTest {
-  private FlowGraph flowGraph;
-  private FlowGraphPathFinder pathFinder;
-
-  @BeforeClass
-  public void setUp()
-      throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException {
-    //Create a FlowGraph
-    this.flowGraph = new BaseFlowGraph();
-
-    //Add DataNodes to the graph from the node properties files
-    URI dataNodesUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI();
-    FileSystem fs = FileSystem.get(dataNodesUri, new Configuration());
-    Path dataNodesPath = new Path(dataNodesUri);
-    ConfigParseOptions options = ConfigParseOptions.defaults()
-        .setSyntax(ConfigSyntax.PROPERTIES)
-        .setAllowMissing(false);
-
-    for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) {
-      try (InputStream is = fs.open(fileStatus.getPath())) {
-        Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
-        Class dataNodeClass = Class.forName(ConfigUtils
-            .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
-        DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
-        this.flowGraph.addDataNode(dataNode);
-      }
-    }
-
-    //Create a FSFlowCatalog instance
-    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-    // Create a FSFlowCatalog instance
-    Properties properties = new Properties();
-    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
-    Config config = ConfigFactory.parseProperties(properties);
-    Config templateCatalogCfg = config
-        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
-
-
-    //Add FlowEdges from the edge properties files
-    URI flowEdgesURI = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
-    fs = FileSystem.get(flowEdgesURI, new Configuration());
-    Path flowEdgesPath = new Path(flowEdgesURI);
-    for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) {
-      log.warn(fileStatus.getPath().toString());
-      try (InputStream is = fs.open(fileStatus.getPath())) {
-        Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
-        Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
-            FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
-        FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
-        FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog);
-        this.flowGraph.addFlowEdge(edge);
-      }
-    }
-
-    //Create a flow spec
-    Properties flowProperties = new Properties();
-    flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
-    flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup");
-    flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName");
-    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "LocalFS-1");
-    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "ADLS-1");
-    Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties);
-
-    //Get the input/output dataset config from a file
-    URI flowConfigUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flow/flow.conf").toURI();
-    Path flowConfigPath = new Path(flowConfigUri);
-    FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration());
-    try (InputStream is = fs1.open(flowConfigPath)) {
-      Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset()));
-      flowConfig = flowConfig.withFallback(datasetConfig).resolve();
-    }
-
-    FlowSpec.Builder flowSpecBuilder = null;
-    flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri())
-        .withConfig(flowConfig)
-        .withDescription("dummy description")
-        .withVersion(FlowSpec.Builder.DEFAULT_VERSION);
-
-    FlowSpec spec = flowSpecBuilder.build();
-    this.pathFinder = new FlowGraphPathFinder(this.flowGraph, spec);
-  }
-
-  @Test
-  public void testFindPath()
-      throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
-             SpecNotFoundException, IOException {
-    Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
-    Assert.assertEquals(jobDag.getNodes().size(), 4);
-    Assert.assertEquals(jobDag.getStartNodes().size(), 1);
-    Assert.assertEquals(jobDag.getEndNodes().size(), 1);
-
-    //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1"
-    Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
-    JobExecutionPlan jobSpecWithExecutor = startNode.getValue();
-    JobSpec jobSpec = jobSpecWithExecutor.getJobSpec();
-
-    //Ensure the resolved job config for the first hop has the correct substitutions.
-    Config jobConfig = jobSpec.getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
-    String from = jobConfig.getString("from");
-    String to = jobConfig.getString("to");
-    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
-    Assert.assertEquals(to, "/data/out/testTeam/testDataset");
-    String sourceFsUri = jobConfig.getString("fs.uri");
-    Assert.assertEquals(sourceFsUri, "file:///");
-    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
-    Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
-    String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
-    Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/");
-    Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
-    Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
-    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
-    Assert.assertEquals(jobConfig.getString("type"), "java");
-    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
-    Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
-    //Ensure the spec executor has the correct configurations
-    SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor();
-    Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
-    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
-
-    //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt"
-    Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
-    jobSpecWithExecutor = secondHopNode.getValue();
-    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
-    from = jobConfig.getString("from");
-    to = jobConfig.getString("to");
-    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
-    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
-    Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
-    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
-    specExecutor = jobSpecWithExecutor.getSpecExecutor();
-    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
-    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
-    //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3"
-    Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
-    jobSpecWithExecutor = thirdHopNode.getValue();
-    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
-    from = jobConfig.getString("from");
-    to = jobConfig.getString("to");
-    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
-    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
-    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/");
-    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
-    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
-    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
-    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
-    //Ensure the spec executor has the correct configurations
-    specExecutor = jobSpecWithExecutor.getSpecExecutor();
-    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
-    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
-    //Get the 4th hop - "Distcp from HDFS3 to ADLS-1"
-    Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
-    jobSpecWithExecutor = fourthHopNode.getValue();
-    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
-    from = jobConfig.getString("from");
-    to = jobConfig.getString("to");
-    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
-    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
-    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
-    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
-    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
-    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
-    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
-    Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
-    Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
-    Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
-    //Ensure the spec executor has the correct configurations
-    specExecutor = jobSpecWithExecutor.getSpecExecutor();
-    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443");
-    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
-    //Ensure the fourth hop is the last
-    Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
-  }
-
-  @Test (dependsOnMethods = "testFindPath")
-  public void testFindPathAfterFirstEdgeDeletion()
-      throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
-             SpecNotFoundException, IOException {
-    //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
-    this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
-
-    Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
-    Assert.assertEquals(jobDag.getNodes().size(), 4);
-    Assert.assertEquals(jobDag.getStartNodes().size(), 1);
-    Assert.assertEquals(jobDag.getEndNodes().size(), 1);
-
-    //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2"
-    Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
-    JobExecutionPlan jobExecutionPlan = startNode.getValue();
-    JobSpec jobSpec = jobExecutionPlan.getJobSpec();
-
-    //Ensure the resolved job config for the first hop has the correct substitutions.
-    Config jobConfig = jobSpec.getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
-    String from = jobConfig.getString("from");
-    String to = jobConfig.getString("to");
-    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
-    Assert.assertEquals(to, "/data/out/testTeam/testDataset");
-    String sourceFsUri = jobConfig.getString("fs.uri");
-    Assert.assertEquals(sourceFsUri, "file:///");
-    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
-    Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
-    String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
-    Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/");
-    Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
-    Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
-    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
-    Assert.assertEquals(jobConfig.getString("type"), "java");
-    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
-    Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
-    //Ensure the spec executor has the correct configurations
-    SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
-    Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
-    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
-
-    //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt"
-    Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
-    jobExecutionPlan = secondHopNode.getValue();
-    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
-    from = jobConfig.getString("from");
-    to = jobConfig.getString("to");
-    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
-    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
-    Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
-    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
-    specExecutor = jobExecutionPlan.getSpecExecutor();
-    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
-    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
-    //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4"
-    Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
-    jobExecutionPlan = thirdHopNode.getValue();
-    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
-    from = jobConfig.getString("from");
-    to = jobConfig.getString("to");
-    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
-    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
-    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/");
-    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
-    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
-    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
-    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
-    //Ensure the spec executor has the correct configurations
-    specExecutor = jobExecutionPlan.getSpecExecutor();
-    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
-    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
-    //Get the 4th hop - "Distcp from HDFS4 to ADLS-1"
-    Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
-    jobExecutionPlan = fourthHopNode.getValue();
-    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
-    from = jobConfig.getString("from");
-    to = jobConfig.getString("to");
-    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
-    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
-    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
-    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
-    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
-    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
-    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
-    Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
-    Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
-    Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
-    //Ensure the spec executor has the correct configurations
-    specExecutor = jobExecutionPlan.getSpecExecutor();
-    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443");
-    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
-    //Ensure the fourth hop is the last
-    Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
-  }
-
-  @Test (dependsOnMethods = "testFindPathAfterFirstEdgeDeletion")
-  public void testFindPathAfterSecondEdgeDeletion()
-      throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
-             SpecNotFoundException, IOException {
-    //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
-    this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
-
-    //Ensure no path to destination.
-    Assert.assertNull(pathFinder.findPath());
-  }
-
-  @AfterClass
-  public void tearDown() {
-  }
-
-  public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor {
-    // Executor Instance
-    protected final Config config;
-
-    private SpecProducer<Spec> azkabanSpecProducer;
-
-    public TestAzkabanSpecExecutor(Config config) {
-      super(config);
-      this.config = config;
-    }
-
-    @Override
-    protected void startUp() throws Exception {
-      //Do nothing
-    }
-
-    @Override
-    protected void shutDown() throws Exception {
-      //Do nothing
-    }
-
-    @Override
-    public Future<String> getDescription() {
-      return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
-    }
-
-    @Override
-    public Future<? extends SpecProducer> getProducer() {
-      return new CompletedFuture<>(this.azkabanSpecProducer, null);
-    }
-
-    @Override
-    public Future<Config> getConfig() {
-      return new CompletedFuture<>(config, null);
-    }
-
-    @Override
-    public Future<String> getHealth() {
-      return new CompletedFuture<>("Healthy", null);
-    }
-
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
new file mode 100644
index 0000000..b8bac02
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigSyntax;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Slf4j
+public class MultiHopFlowCompilerTest {
+  private FlowGraph flowGraph;
+  private SpecCompiler specCompiler;
+
+  @BeforeClass
+  public void setUp()
+      throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException {
+    //Create a FlowGraph
+    this.flowGraph = new BaseFlowGraph();
+
+    //Add DataNodes to the graph from the node properties files
+    URI dataNodesUri = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI();
+    FileSystem fs = FileSystem.get(dataNodesUri, new Configuration());
+    Path dataNodesPath = new Path(dataNodesUri);
+    ConfigParseOptions options = ConfigParseOptions.defaults()
+        .setSyntax(ConfigSyntax.PROPERTIES)
+        .setAllowMissing(false);
+
+    for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) {
+      try (InputStream is = fs.open(fileStatus.getPath())) {
+        Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+        Class dataNodeClass = Class.forName(ConfigUtils
+            .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+        DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
+        this.flowGraph.addDataNode(dataNode);
+      }
+    }
+
+    //Create a FSFlowCatalog instance
+    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    // Create a FSFlowCatalog instance
+    Properties properties = new Properties();
+    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+
+    //Add FlowEdges from the edge properties files
+    URI flowEdgesURI = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
+    fs = FileSystem.get(flowEdgesURI, new Configuration());
+    Path flowEdgesPath = new Path(flowEdgesURI);
+    for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) {
+      log.warn(fileStatus.getPath().toString());
+      try (InputStream is = fs.open(fileStatus.getPath())) {
+        Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+        Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+            FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
+        FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
+        FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog);
+        this.flowGraph.addFlowEdge(edge);
+      }
+    }
+
+    this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph);
+  }
+
+  private FlowSpec createFlowSpec(String flowConfigResource, String source, String destination) throws IOException, URISyntaxException {
+    //Create a flow spec
+    Properties flowProperties = new Properties();
+    flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
+    flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup");
+    flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName");
+    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, source);
+    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, destination);
+    Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties);
+
+    //Get the input/output dataset config from a file
+    URI flowConfigUri = MultiHopFlowCompilerTest.class.getClassLoader().getResource(flowConfigResource).toURI();
+    Path flowConfigPath = new Path(flowConfigUri);
+    FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration());
+    try (InputStream is = fs1.open(flowConfigPath)) {
+      Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset()));
+      flowConfig = flowConfig.withFallback(datasetConfig).resolve();
+    }
+
+    FlowSpec.Builder flowSpecBuilder = null;
+    flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri())
+        .withConfig(flowConfig)
+        .withDescription("dummy description")
+        .withVersion(FlowSpec.Builder.DEFAULT_VERSION);
+
+    FlowSpec spec = flowSpecBuilder.build();
+    return spec;
+  }
+
+  @Test
+  public void testCompileFlow() throws URISyntaxException, IOException {
+    FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1");
+    Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
+    Assert.assertEquals(jobDag.getNodes().size(), 4);
+    Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+    Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+    //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1"
+    Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+    JobExecutionPlan jobSpecWithExecutor = startNode.getValue();
+    JobSpec jobSpec = jobSpecWithExecutor.getJobSpec();
+
+    //Ensure the resolved job config for the first hop has the correct substitutions.
+    Config jobConfig = jobSpec.getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    String from = jobConfig.getString("from");
+    String to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+    String sourceFsUri = jobConfig.getString("fs.uri");
+    Assert.assertEquals(sourceFsUri, "file:///");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+    Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+    String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+    Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+    Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    Assert.assertEquals(jobConfig.getString("type"), "java");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+    //Ensure the spec executor has the correct configurations
+    SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+    //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt"
+    Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+    jobSpecWithExecutor = secondHopNode.getValue();
+    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+    //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3"
+    Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+    jobSpecWithExecutor = thirdHopNode.getValue();
+    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+    //Get the 4th hop - "Distcp from HDFS3 to ADLS-1"
+    Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+    jobSpecWithExecutor = fourthHopNode.getValue();
+    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+    Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+    Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+    //Ensure the fourth hop is the last
+    Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+  }
+
+  @Test (dependsOnMethods = "testCompileFlow")
+  public void testCompileFlowAfterFirstEdgeDeletion() throws URISyntaxException, IOException {
+    //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
+    this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
+
+    FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1");
+    Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
+
+    Assert.assertEquals(jobDag.getNodes().size(), 4);
+    Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+    Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+    //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2"
+    Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+    JobExecutionPlan jobExecutionPlan = startNode.getValue();
+    JobSpec jobSpec = jobExecutionPlan.getJobSpec();
+
+    //Ensure the resolved job config for the first hop has the correct substitutions.
+    Config jobConfig = jobSpec.getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    String from = jobConfig.getString("from");
+    String to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+    String sourceFsUri = jobConfig.getString("fs.uri");
+    Assert.assertEquals(sourceFsUri, "file:///");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+    Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+    String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+    Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+    Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    Assert.assertEquals(jobConfig.getString("type"), "java");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+    //Ensure the spec executor has the correct configurations
+    SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+    //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt"
+    Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+    jobExecutionPlan = secondHopNode.getValue();
+    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+    //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4"
+    Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+    jobExecutionPlan = thirdHopNode.getValue();
+    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+    //Get the 4th hop - "Distcp from HDFS4 to ADLS-1"
+    Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+    jobExecutionPlan = fourthHopNode.getValue();
+    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+    Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+    Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+    //Ensure the fourth hop is the last
+    Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+  }
+
+  @Test (dependsOnMethods = "testCompileFlowAfterFirstEdgeDeletion")
+  public void testCompileFlowAfterSecondEdgeDeletion() throws URISyntaxException, IOException {
+    //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
+    this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
+
+    FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1");
+    Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
+
+    //Ensure no path to destination.
+    Assert.assertTrue(jobDag.isEmpty());
+  }
+
+  @AfterClass
+  public void tearDown() {
+  }
+
+  public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor {
+    // Executor Instance
+    protected final Config config;
+
+    private SpecProducer<Spec> azkabanSpecProducer;
+
+    public TestAzkabanSpecExecutor(Config config) {
+      super(config);
+      this.config = config;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      //Do nothing
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      //Do nothing
+    }
+
+    @Override
+    public Future<String> getDescription() {
+      return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
+    }
+
+    @Override
+    public Future<? extends SpecProducer> getProducer() {
+      return new CompletedFuture<>(this.azkabanSpecProducer, null);
+    }
+
+    @Override
+    public Future<Config> getConfig() {
+      return new CompletedFuture<>(config, null);
+    }
+
+    @Override
+    public Future<String> getHealth() {
+      return new CompletedFuture<>("Healthy", null);
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
index bcf6d44..44d3c44 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
@@ -2,7 +2,7 @@ flow.edge.source=HDFS-1
 flow.edge.destination=HDFS-1
 flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
 flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
 flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
 flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
index 99d1ed7..897f003 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
@@ -2,7 +2,7 @@ flow.edge.source=HDFS-1
 flow.edge.destination=HDFS-3
 flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
 flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
 flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
 flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
index 537cbfa..db0ea48 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
@@ -2,7 +2,7 @@ flow.edge.source=HDFS-2
 flow.edge.destination=HDFS-2
 flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
 flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
 flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
 flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
index 6ec2ea5..44f0408 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
@@ -2,7 +2,7 @@ flow.edge.source=HDFS-2
 flow.edge.destination=HDFS-4
 flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs
 flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
 flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
 flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
 flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE