You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/18 06:22:52 UTC

incubator-gobblin git commit: [GOBBLIN-559] Implement FlowGraph as a concurrent data structure.

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master eb2c128dc -> 5a6bfea9f


[GOBBLIN-559] Implement FlowGraph as a concurrent data structure.

Closes #2423 from sv2000/multithread


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5a6bfea9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5a6bfea9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5a6bfea9

Branch: refs/heads/master
Commit: 5a6bfea9f689360ffb7c5a5c4fef95487b5b4e4c
Parents: eb2c128
Author: sv2000 <su...@gmail.com>
Authored: Fri Aug 17 23:22:54 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Aug 17 23:22:54 2018 -0700

----------------------------------------------------------------------
 .../modules/flow/FlowGraphPathFinder.java       | 345 -------------------
 .../modules/flow/MultiHopFlowCompiler.java      |  38 +-
 .../modules/flowgraph/BaseFlowGraph.java        | 146 +++++---
 .../service/modules/flowgraph/FlowGraph.java    |  19 +
 .../flowgraph/FlowGraphConfigurationKeys.java   |   8 +-
 .../pathfinder/AbstractPathFinder.java          | 259 ++++++++++++++
 .../flowgraph/pathfinder/BFSPathFinder.java     | 138 ++++++++
 .../flowgraph/pathfinder/PathFinder.java        |  44 +++
 8 files changed, 587 insertions(+), 410 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
deleted file mode 100644
index 59c831d..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flow;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-
-
-@Alpha
-@Slf4j
-public class FlowGraphPathFinder {
-  private static final String SOURCE_PREFIX = "source";
-  private static final String DESTINATION_PREFIX = "destination";
-
-  private FlowGraph flowGraph;
-  private FlowSpec flowSpec;
-  private Config flowConfig;
-
-  private DataNode srcNode;
-  private List<DataNode> destNodes;
-
-  private DatasetDescriptor srcDatasetDescriptor;
-  private DatasetDescriptor destDatasetDescriptor;
-
-  //Maintain path of FlowEdges as parent-child map
-  private Map<FlowEdgeContext, FlowEdgeContext> pathMap;
-
-  //Flow Execution Id
-  private Long flowExecutionId;
-
-  /**
-   * Constructor.
-   * @param flowGraph
-   */
-  public FlowGraphPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) {
-    this.flowGraph = flowGraph;
-    this.flowSpec = flowSpec;
-    this.flowConfig = flowSpec.getConfig();
-
-    //Get src/dest DataNodes from the flow config
-    String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
-
-    List<String> destNodeIds = ConfigUtils.getStringList(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
-    this.srcNode = this.flowGraph.getNode(srcNodeId);
-    Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
-    for (String destNodeId : destNodeIds) {
-      DataNode destNode = this.flowGraph.getNode(destNodeId);
-      Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId);
-      if (this.destNodes == null) {
-        this.destNodes = new ArrayList<>();
-      }
-      this.destNodes.add(destNode);
-    }
-    //Get src/dest dataset descriptors from the flow config
-    Config srcDatasetDescriptorConfig =
-        flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
-    Config destDatasetDescriptorConfig =
-        flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
-
-    try {
-      Class srcdatasetDescriptorClass =
-          Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
-      this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
-          .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig);
-      Class destDatasetDescriptorClass =
-          Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
-      this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
-          .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public FlowGraphPath findPath() throws PathFinderException {
-    // Generate flow execution id for this compilation
-    this.flowExecutionId = System.currentTimeMillis();
-
-    FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId);
-    //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
-    // flow graph.
-    // TODO: we can easily improve the performance by using a ReentrantReadWriteLock associated with the FlowGraph. This will
-    // allow multiple concurrent readers to not be blocked on each other, as long as there are no writers.
-    synchronized (this.flowGraph) {
-      for (DataNode destNode : this.destNodes) {
-        List<FlowEdgeContext> path = findPathBFS(destNode);
-        if (path != null) {
-          flowGraphPath.addPath(path);
-        } else {
-          //No path to at least one of the destination nodes.
-          return null;
-        }
-      }
-    }
-    return flowGraphPath;
-  }
-
-  /**
-   * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s
-   * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are
-   * added first to the queue. This ensures that dataset transformations are always performed closest to the source.
-   * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode.
-   */
-  private List<FlowEdgeContext> findPathBFS(DataNode destNode)
-      throws PathFinderException {
-    try {
-      //Initialization of auxiliary data structures used for path computation
-      this.pathMap = new HashMap<>();
-
-      //Base condition 1: Source Node or Dest Node is inactive; return null
-      if (!srcNode.isActive() || !destNode.isActive()) {
-        log.warn("Either source node {} or destination node {} is inactive; skipping path computation.",
-            this.srcNode.getId(), destNode.getId());
-        return null;
-      }
-
-      //Base condition 2: Check if we are already at the target. If so, return an empty path.
-      if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
-        return new ArrayList<>();
-      }
-
-      LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
-      edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
-      for (FlowEdgeContext flowEdgeContext : edgeQueue) {
-        this.pathMap.put(flowEdgeContext, flowEdgeContext);
-      }
-
-      //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges
-      // to the edge E. For each adjacent edge E', do the following:
-      //    1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and
-      //    2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
-      //       edge E'. If yes, add the edge E' to the edge queue.
-      // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
-      while (!edgeQueue.isEmpty()) {
-        FlowEdgeContext flowEdgeContext = edgeQueue.pop();
-
-        DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
-        DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
-
-        //Are we done?
-        if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
-          return constructPath(flowEdgeContext);
-        }
-
-        //Expand the currentNode to its adjacent edges and add them to the queue.
-        List<FlowEdgeContext> nextEdges =
-            getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
-        for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
-          //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
-          // queue.
-          if (!this.pathMap.containsKey(childFlowEdgeContext)) {
-            edgeQueue.add(childFlowEdgeContext);
-            this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
-          }
-        }
-      }
-      //No path found. Return null.
-      return null;
-    } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) {
-      throw new PathFinderException(
-          "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + destNode
-              .getId(), e);
-    }
-  }
-
-  private boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor,
-      DatasetDescriptor destDatasetDescriptor) {
-    if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an
-   * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor.
-   * @param dataNode
-   * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge.
-   * @param destDatasetDescriptor Target {@link DatasetDescriptor}.
-   * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion.
-   */
-  private List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
-      DatasetDescriptor destDatasetDescriptor) {
-    List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
-    for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
-      try {
-        DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
-        //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive.
-        if (!edgeDestination.isActive() || !flowEdge.isActive()) {
-          continue;
-        }
-
-        boolean foundExecutor = false;
-        //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
-        for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
-          Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
-          List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
-              flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
-          for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
-            DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
-            DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
-            if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
-              FlowEdgeContext flowEdgeContext;
-              if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) {
-                //If datasets described by the currentDatasetDescriptor is a subset of the datasets described
-                // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g.
-                // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward.
-                flowEdgeContext =
-                    new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig,
-                        specExecutor);
-              } else {
-                //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge)
-                flowEdgeContext =
-                    new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig,
-                        specExecutor);
-              }
-              if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) {
-                //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
-                // with those of destination dataset descriptor.
-                // In other words, we prioritize edges that perform data transformations as close to the source as possible.
-                prioritizedEdgeList.add(0, flowEdgeContext);
-              } else {
-                prioritizedEdgeList.add(flowEdgeContext);
-              }
-              foundExecutor = true;
-            }
-          }
-          // Found a SpecExecutor. Proceed to the next FlowEdge.
-          // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
-          if (foundExecutor) {
-            break;
-          }
-        }
-      } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException
-          | JobTemplate.TemplateException e) {
-        //Skip the edge; and continue
-        log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
-      }
-    }
-    return prioritizedEdgeList;
-  }
-
-  /**
-   * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below):
-   * <ul>
-   *   <p> the user provided flow config </p>
-   *   <p> edge specific properties/overrides </p>
-   *   <p> spec executor config/overrides </p>
-   *   <p> source node config </p>
-   *   <p> destination node config </p>
-   * </ul>
-   * Each {@link JobTemplate}'s config will eventually be resolved against this merged config.
-   * @param flowEdge An instance of {@link FlowEdge}.
-   * @param specExecutor A {@link SpecExecutor}.
-   * @return the merged config derived as described above.
-   */
-  private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor)
-      throws ExecutionException, InterruptedException {
-    Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
-    Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
-    Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig())
-        .withFallback(srcNodeConfig).withFallback(destNodeConfig);
-    return mergedConfig;
-  }
-
-  /**
-   *
-   * @param flowEdgeContext of the last {@link FlowEdge} in the path.
-   * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}.
-   * @throws IOException
-   * @throws SpecNotFoundException
-   * @throws JobTemplate.TemplateException
-   * @throws URISyntaxException
-   */
-  private List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext)
-      throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
-    //Backtrace from the last edge using the path map and push each edge into a LIFO data structure.
-    List<FlowEdgeContext> path = new LinkedList<>();
-    path.add(flowEdgeContext);
-    FlowEdgeContext currentFlowEdgeContext = flowEdgeContext;
-    while (true) {
-      path.add(0, this.pathMap.get(currentFlowEdgeContext));
-      currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext);
-      //Are we at the first edge in the path?
-      if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) {
-        break;
-      }
-    }
-    return path;
-  }
-
-  public static class PathFinderException extends Exception {
-    public PathFinderException(String message, Throwable cause) {
-      super(message, cause);
-    }
-
-    public PathFinderException(String message) {
-      super(message);
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 83951a0..1ab8312 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.flow;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -28,7 +27,6 @@ import org.slf4j.Logger;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
 import lombok.Getter;
@@ -42,15 +40,16 @@ import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /***
@@ -60,8 +59,9 @@ import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 @Alpha
 @Slf4j
 public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
+
   @Getter
-  private FlowGraph flowGraph;
+  private final FlowGraph flowGraph;
   private GitFlowGraphMonitor gitFlowGraphMonitor;
   @Getter
   private boolean active;
@@ -80,9 +80,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
 
   public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
     super(config, log, instrumentationEnabled);
-    Config templateCatalogCfg = config
-        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    Config templateCatalogCfg = config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+        config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
     FSFlowCatalog flowCatalog;
     try {
       flowCatalog = new FSFlowCatalog(templateCatalogCfg);
@@ -118,32 +117,32 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     long startTime = System.nanoTime();
 
     FlowSpec flowSpec = (FlowSpec) spec;
-    String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
-    String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+    String source = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
+    String destination =
+        ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
     log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
 
     Dag<JobExecutionPlan> jobExecutionPlanDag;
-    FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec);
     try {
       //Compute the path from source to destination.
-      FlowGraphPath flowGraphPath = pathFinder.findPath();
-
+      FlowGraphPath flowGraphPath = flowGraph.findPath(flowSpec);
       //Convert the path into a Dag of JobExecutionPlans.
       if (flowGraphPath != null) {
         jobExecutionPlanDag = flowGraphPath.asDag();
       } else {
-        Instrumented.markMeter(this.flowCompilationFailedMeter);
+        Instrumented.markMeter(flowCompilationFailedMeter);
         log.info(String.format("No path found from source: %s and destination: %s", source, destination));
         return new JobExecutionPlanDagFactory().createDag(new ArrayList<>());
       }
-    } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException e) {
-      Instrumented.markMeter(this.flowCompilationFailedMeter);
-      log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e);
+    } catch (PathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | ReflectiveOperationException e) {
+      Instrumented.markMeter(flowCompilationFailedMeter);
+      log.error(String
+              .format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination),
+          e);
       return null;
     }
-    Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
-    Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
+    Instrumented.markMeter(flowCompilationSuccessFulMeter);
+    Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
     return jobExecutionPlanDag;
   }
 
@@ -152,5 +151,4 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     log.warn("No population of templates based on edge happen in this implementation");
     return;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index edf40cc..e2b256f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -21,8 +21,15 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.flow.FlowGraphPath;
+import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -32,10 +39,14 @@ import lombok.extern.slf4j.Slf4j;
  *   <p>dataNodeMap - the mapping from a node identifier to the {@link DataNode} instance</p>
  *   <p>nodesToEdges - the mapping from each {@link DataNode} to its outgoing {@link FlowEdge}s</p>
  *   <p>flowEdgeMap - the mapping from a edge label to the {@link FlowEdge} instance</p>
+ *
+ *   Read/Write Access to the {@link FlowGraph} is synchronized via a {@link ReentrantReadWriteLock}.
  */
 @Alpha
 @Slf4j
 public class BaseFlowGraph implements FlowGraph {
+  private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+
   private Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>();
   private Map<String, DataNode> dataNodeMap = new HashMap<>();
   private Map<String, FlowEdge> flowEdgeMap = new HashMap<>();
@@ -58,11 +69,16 @@ public class BaseFlowGraph implements FlowGraph {
    * @return true if node is successfully added to the {@link FlowGraph}.
    */
   @Override
-  public synchronized boolean addDataNode(DataNode node) {
-    //Get edges adjacent to the node if it already exists
-    Set<FlowEdge> edges = this.nodesToEdges.getOrDefault(node, new HashSet<>());
-    this.nodesToEdges.put(node, edges);
-    this.dataNodeMap.put(node.getId(), node);
+  public boolean addDataNode(DataNode node) {
+    try {
+      rwLock.writeLock().lock();
+      //Get edges adjacent to the node if it already exists
+      Set<FlowEdge> edges = this.nodesToEdges.getOrDefault(node, new HashSet<>());
+      this.nodesToEdges.put(node, edges);
+      this.dataNodeMap.put(node.getId(), node);
+    } finally {
+      rwLock.writeLock().unlock();
+    }
     return true;
   }
 
@@ -74,16 +90,20 @@ public class BaseFlowGraph implements FlowGraph {
    * @return true if addition of {@FlowEdge} is successful.
    */
   @Override
-  public synchronized boolean addFlowEdge(FlowEdge edge) {
-    String srcNode = edge.getSrc();
-    String dstNode = edge.getDest();
-    if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) {
-      return false;
-    }
-    DataNode dataNode = getNode(srcNode);
-    if(dataNode != null) {
+  public boolean addFlowEdge(FlowEdge edge) {
+    try {
+      rwLock.writeLock().lock();
+      String srcNode = edge.getSrc();
+      String dstNode = edge.getDest();
+      if (!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) {
+        return false;
+      }
+      DataNode dataNode = getNode(srcNode);
+      if (dataNode == null) {
+        return false;
+      }
       Set<FlowEdge> adjacentEdges = this.nodesToEdges.get(dataNode);
-      if(!adjacentEdges.add(edge)) {
+      if (!adjacentEdges.add(edge)) {
         adjacentEdges.remove(edge);
         adjacentEdges.add(edge);
       }
@@ -91,8 +111,8 @@ public class BaseFlowGraph implements FlowGraph {
       String edgeId = edge.getId();
       this.flowEdgeMap.put(edgeId, edge);
       return true;
-    } else {
-      return false;
+    } finally {
+      rwLock.writeLock().unlock();
     }
   }
 
@@ -102,11 +122,12 @@ public class BaseFlowGraph implements FlowGraph {
    * @return true if {@link DataNode} is successfully deleted.
    */
   @Override
-  public synchronized boolean deleteDataNode(String nodeId) {
-    if(this.dataNodeMap.containsKey(nodeId) && deleteDataNode(this.dataNodeMap.get(nodeId))) {
-      return true;
-    } else {
-      return false;
+  public boolean deleteDataNode(String nodeId) {
+    try {
+      rwLock.writeLock().lock();
+      return this.dataNodeMap.containsKey(nodeId) && deleteDataNode(this.dataNodeMap.get(nodeId));
+    } finally {
+      rwLock.writeLock().unlock();
     }
   }
 
@@ -115,20 +136,25 @@ public class BaseFlowGraph implements FlowGraph {
    * @param node to be deleted.
    * @return true if {@link DataNode} is successfully deleted.
    */
-  public synchronized boolean deleteDataNode(DataNode node) {
-    if(dataNodeMap.containsKey(node.getId())) {
+  public boolean deleteDataNode(DataNode node) {
+    try {
+      rwLock.writeLock().lock();
+      if (!dataNodeMap.containsKey(node.getId())) {
+        return false;
+      }
       //Delete node from dataNodeMap
       dataNodeMap.remove(node.getId());
 
       //Delete all the edges adjacent to the node. First, delete edges from flowEdgeMap and next, remove the edges
       // from nodesToEdges
-      for(FlowEdge edge: nodesToEdges.get(node)) {
+      for (FlowEdge edge : nodesToEdges.get(node)) {
         flowEdgeMap.remove(edge.getId());
       }
       nodesToEdges.remove(node);
       return true;
-    } else {
-      return false;
+
+    } finally {
+      rwLock.writeLock().unlock();
     }
   }
 
@@ -138,11 +164,12 @@ public class BaseFlowGraph implements FlowGraph {
    * @return true if {@link FlowEdge} is successfully deleted.
    */
   @Override
-  public synchronized boolean deleteFlowEdge(String edgeId) {
-    if(flowEdgeMap.containsKey(edgeId) && deleteFlowEdge(flowEdgeMap.get(edgeId))) {
-      return true;
-    } else {
-      return false;
+  public boolean deleteFlowEdge(String edgeId) {
+    try {
+      rwLock.writeLock().lock();
+      return flowEdgeMap.containsKey(edgeId) && deleteFlowEdge(flowEdgeMap.get(edgeId));
+    } finally {
+      rwLock.writeLock().unlock();
     }
   }
 
@@ -152,17 +179,22 @@ public class BaseFlowGraph implements FlowGraph {
    * @return true if {@link FlowEdge} is successfully deleted. If the source of a {@link FlowEdge} does not exist or
    * if the {@link FlowEdge} is not in the graph, return false.
    */
-  public synchronized boolean deleteFlowEdge(FlowEdge edge) {
-    if(!dataNodeMap.containsKey(edge.getSrc())) {
-      return false;
-    }
-    DataNode node = dataNodeMap.get(edge.getSrc());
-    if(!nodesToEdges.get(node).contains(edge)) {
-      return false;
+  public boolean deleteFlowEdge(FlowEdge edge) {
+    try {
+      rwLock.writeLock().lock();
+      if (!dataNodeMap.containsKey(edge.getSrc())) {
+        return false;
+      }
+      DataNode node = dataNodeMap.get(edge.getSrc());
+      if (!nodesToEdges.get(node).contains(edge)) {
+        return false;
+      }
+      this.nodesToEdges.get(node).remove(edge);
+      this.flowEdgeMap.remove(edge.getId());
+      return true;
+    } finally {
+      rwLock.writeLock().unlock();
     }
-    this.nodesToEdges.get(node).remove(edge);
-    this.flowEdgeMap.remove(edge.getId());
-    return true;
   }
 
   /**
@@ -172,8 +204,13 @@ public class BaseFlowGraph implements FlowGraph {
    */
   @Override
   public Set<FlowEdge> getEdges(String nodeId) {
-    DataNode dataNode = this.dataNodeMap.getOrDefault(nodeId, null);
-    return getEdges(dataNode);
+    try {
+      rwLock.readLock().lock();
+      DataNode dataNode = this.dataNodeMap.getOrDefault(nodeId, null);
+      return getEdges(dataNode);
+    } finally {
+      rwLock.readLock().unlock();
+    }
   }
 
   /**
@@ -183,7 +220,28 @@ public class BaseFlowGraph implements FlowGraph {
    */
   @Override
   public Set<FlowEdge> getEdges(DataNode node) {
-    return (node != null)? this.nodesToEdges.getOrDefault(node, null) : null;
+    try {
+      rwLock.readLock().lock();
+      return (node != null) ? this.nodesToEdges.getOrDefault(node, null) : null;
+    } finally {
+      rwLock.readLock().unlock();
+    }
   }
 
+  /**{@inheritDoc}**/
+  @Override
+  public FlowGraphPath findPath(FlowSpec flowSpec) throws PathFinder.PathFinderException, ReflectiveOperationException {
+    try {
+      rwLock.readLock().lock();
+      //Instantiate a PathFinder.
+      Class pathFinderClass = Class.forName(ConfigUtils
+          .getString(flowSpec.getConfig(), FlowGraphConfigurationKeys.FLOW_GRAPH_PATH_FINDER_CLASS,
+              FlowGraphConfigurationKeys.DEFAULT_FLOW_GRAPH_PATH_FINDER_CLASS));
+      PathFinder pathFinder =
+          (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec);
+      return pathFinder.findPath();
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
index b4aa7bf..1d8eb23 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
@@ -19,7 +19,13 @@ package org.apache.gobblin.service.modules.flowgraph;
 
 import java.util.Collection;
 
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.FlowGraphPath;
+import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
 
 
 /**
@@ -80,4 +86,17 @@ public interface FlowGraph {
    * @return a collection of edges adjacent to the {@link DataNode}
    */
   public Collection<FlowEdge> getEdges(DataNode node);
+
+  /**
+   * A method that takes a {@link FlowSpec} containing the source and destination {@link DataNode}s, as well as the
+   * source and target {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}s, and returns a sequence
+   * of fully resolved {@link org.apache.gobblin.runtime.api.JobSpec}s that will move the source dataset
+   * from the source datanode, perform any necessary transformations and land the dataset at the destination node
+   * in the format described by the target {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}.
+   *
+   * @param flowSpec a {@link org.apache.gobblin.runtime.api.Spec} containing a high-level description of input flow.
+   * @return an instance of {@link FlowGraphPath} that encapsulates a sequence of {@link org.apache.gobblin.runtime.api.JobSpec}s
+   * satisfying flowSpec.
+   */
+  public FlowGraphPath findPath(FlowSpec flowSpec) throws PathFinder.PathFinderException, ReflectiveOperationException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
index 8a49ec0..5a43a83 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
@@ -20,7 +20,7 @@ package org.apache.gobblin.service.modules.flowgraph;
 public class FlowGraphConfigurationKeys {
   public static final String DATA_NODE_PREFIX = "data.node.";
   public static final String FLOW_EDGE_PREFIX = "flow.edge.";
-
+  public static final String FLOW_GRAPH_PREFIX = "flow.graph.";
   /**
    *   {@link DataNode} related configuration keys.
    */
@@ -42,4 +42,10 @@ public class FlowGraphConfigurationKeys {
   public static final String FLOW_EDGE_TEMPLATE_DIR_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateDirUri";
   public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors";
   public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecInstance.class";
+
+  /**
+   * {@link org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder} related configuration keys.
+   */
+  public static final String FLOW_GRAPH_PATH_FINDER_CLASS = FLOW_GRAPH_PREFIX + "pathfinder.class";
+  public static final String DEFAULT_FLOW_GRAPH_PATH_FINDER_CLASS = "org.apache.gobblin.service.modules.flowgraph.pathfinder.BFSPathFinder";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
new file mode 100644
index 0000000..9901c2f
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.pathfinder;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flow.FlowEdgeContext;
+import org.apache.gobblin.service.modules.flow.FlowGraphPath;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Alpha
+@Slf4j
+public abstract class AbstractPathFinder implements PathFinder {
+  private static final String SOURCE_PREFIX = "source";
+  private static final String DESTINATION_PREFIX = "destination";
+
+  protected FlowGraph flowGraph;
+  protected FlowSpec flowSpec;
+  protected Config flowConfig;
+
+  protected DataNode srcNode;
+  protected List<DataNode> destNodes;
+
+  protected DatasetDescriptor srcDatasetDescriptor;
+  protected DatasetDescriptor destDatasetDescriptor;
+
+  //Maintain path of FlowEdges as parent-child map
+  protected Map<FlowEdgeContext, FlowEdgeContext> pathMap;
+
+  //Flow Execution Id
+  protected Long flowExecutionId;
+
+  public AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec)
+      throws ReflectiveOperationException {
+    this.flowGraph = flowGraph;
+    this.flowSpec = flowSpec;
+    this.flowConfig = flowSpec.getConfig();
+
+    //Get src/dest DataNodes from the flow config
+    String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
+
+    List<String> destNodeIds = ConfigUtils.getStringList(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+    this.srcNode = this.flowGraph.getNode(srcNodeId);
+    Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
+    for (String destNodeId : destNodeIds) {
+      DataNode destNode = this.flowGraph.getNode(destNodeId);
+      Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId);
+      if (this.destNodes == null) {
+        this.destNodes = new ArrayList<>();
+      }
+      this.destNodes.add(destNode);
+    }
+    //Get src/dest dataset descriptors from the flow config
+    Config srcDatasetDescriptorConfig =
+        flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
+    Config destDatasetDescriptorConfig =
+        flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+
+    Class srcdatasetDescriptorClass =
+        Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+    this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+        .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig);
+    Class destDatasetDescriptorClass =
+        Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+    this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+        .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig);
+  }
+
+  protected boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor,
+      DatasetDescriptor destDatasetDescriptor) {
+    if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an
+   * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor.
+   * @param dataNode
+   * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge.
+   * @param destDatasetDescriptor Target {@link DatasetDescriptor}.
+   * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion.
+   */
+  protected List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
+      DatasetDescriptor destDatasetDescriptor) {
+    List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
+    for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
+      try {
+        DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
+        //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive.
+        if (!edgeDestination.isActive() || !flowEdge.isActive()) {
+          continue;
+        }
+
+        boolean foundExecutor = false;
+        //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
+        for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
+          Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
+          List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
+              flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
+          for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
+            DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
+            DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
+            if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+              FlowEdgeContext flowEdgeContext;
+              if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+                //If datasets described by the currentDatasetDescriptor is a subset of the datasets described
+                // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g.
+                // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward.
+                flowEdgeContext =
+                    new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig,
+                        specExecutor);
+              } else {
+                //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge)
+                flowEdgeContext =
+                    new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig,
+                        specExecutor);
+              }
+              if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) {
+                //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
+                // with those of destination dataset descriptor.
+                // In other words, we prioritize edges that perform data transformations as close to the source as possible.
+                prioritizedEdgeList.add(0, flowEdgeContext);
+              } else {
+                prioritizedEdgeList.add(flowEdgeContext);
+              }
+              foundExecutor = true;
+            }
+          }
+          // Found a SpecExecutor. Proceed to the next FlowEdge.
+          // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
+          if (foundExecutor) {
+            break;
+          }
+        }
+      } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException
+          | JobTemplate.TemplateException e) {
+        //Skip the edge; and continue
+        log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
+      }
+    }
+    return prioritizedEdgeList;
+  }
+
+  /**
+   * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below):
+   * <ul>
+   *   <p> the user provided flow config </p>
+   *   <p> edge specific properties/overrides </p>
+   *   <p> spec executor config/overrides </p>
+   *   <p> source node config </p>
+   *   <p> destination node config </p>
+   * </ul>
+   * Each {@link JobTemplate}'s config will eventually be resolved against this merged config.
+   * @param flowEdge An instance of {@link FlowEdge}.
+   * @param specExecutor A {@link SpecExecutor}.
+   * @return the merged config derived as described above.
+   */
+  private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor)
+      throws ExecutionException, InterruptedException {
+    Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
+    Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
+    Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig())
+        .withFallback(srcNodeConfig).withFallback(destNodeConfig);
+    return mergedConfig;
+  }
+
+  /**
+   *
+   * @param flowEdgeContext of the last {@link FlowEdge} in the path.
+   * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}.
+   * @throws IOException
+   * @throws SpecNotFoundException
+   * @throws JobTemplate.TemplateException
+   * @throws URISyntaxException
+   */
+  protected List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext)
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+    //Backtrace from the last edge using the path map and push each edge into a LIFO data structure.
+    List<FlowEdgeContext> path = new LinkedList<>();
+    path.add(flowEdgeContext);
+    FlowEdgeContext currentFlowEdgeContext = flowEdgeContext;
+    while (true) {
+      path.add(0, this.pathMap.get(currentFlowEdgeContext));
+      currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext);
+      //Are we at the first edge in the path?
+      if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) {
+        break;
+      }
+    }
+    return path;
+  }
+
+  @Override
+  public FlowGraphPath findPath() throws PathFinderException {
+    // Generate flow execution id for this compilation
+    this.flowExecutionId = System.currentTimeMillis();
+
+    FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId);
+    //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
+    // flow graph.
+    for (DataNode destNode : this.destNodes) {
+      List<FlowEdgeContext> path = findPathUnicast(destNode);
+      if (path != null) {
+        flowGraphPath.addPath(path);
+      } else {
+        //No path to at least one of the destination nodes.
+        return null;
+      }
+    }
+    return flowGraphPath;
+  }
+
+  public abstract List<FlowEdgeContext> findPathUnicast(DataNode destNode) throws PathFinderException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
new file mode 100644
index 0000000..02dd67b
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.pathfinder;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flow.FlowEdgeContext;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+
+
+/**
+ * An implementation of {@link PathFinder} that assumes an unweighted {@link FlowGraph} and computes the
+ * shortest path using a variant of the BFS path-finding algorithm. This implementation has two key differences from the
+ * traditional BFS implementations:
+ *  <ul>
+ *    <p> the input graph is a multi-graph i.e. there could be multiple edges between each pair of nodes, and </p>
+ *    <p> each edge has a label associated with it. In our case, the label corresponds to the set of input/output
+ *    dataset descriptors that are accepted by the edge.</p>
+ *  </ul>
+ *  Given these differences, we maintain:
+ *    <p> a {@link HashMap} of list of visited edges, as opposed to list of visited
+ *  vertices as in the case of traditional BFS, and </p>
+ *    <p> for each edge, we maintain additional state that includes the input/output dataset descriptor
+ *  associated with the particular visitation of that edge. </p>
+ *  This additional information allows us to accurately mark edges as visited and guarantee termination of the algorithm.
+ */
+@Alpha
+@Slf4j
+public class BFSPathFinder extends AbstractPathFinder {
+  /**
+   * Constructor.
+   * @param flowGraph
+   */
+  public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) throws ReflectiveOperationException {
+    super(flowGraph, flowSpec);
+  }
+
+  /**
+   * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s
+   * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are
+   * added first to the queue. This ensures that dataset transformations are always performed closest to the source.
+   * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode.
+   */
+  public List<FlowEdgeContext> findPathUnicast(DataNode destNode) throws PathFinderException {
+    try {
+      //Initialization of auxiliary data structures used for path computation
+      this.pathMap = new HashMap<>();
+
+      // Generate flow execution id for this compilation
+      this.flowExecutionId = System.currentTimeMillis();
+
+      //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
+      // flow graph.
+        //Base condition 1: Source Node or Dest Node is inactive; return null
+        if (!srcNode.isActive() || !destNode.isActive()) {
+          log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", this.srcNode.getId(),
+              destNode.getId());
+          return null;
+        }
+
+        //Base condition 2: Check if we are already at the target. If so, return an empty path.
+        if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
+          return new ArrayList<>();
+        }
+
+        LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
+        edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
+        for (FlowEdgeContext flowEdgeContext : edgeQueue) {
+          this.pathMap.put(flowEdgeContext, flowEdgeContext);
+        }
+
+        //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges
+        // to the edge E. For each adjacent edge E', do the following:
+        //    1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and
+        //    2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
+        //       edge E'. If yes, add the edge E' to the edge queue.
+        // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
+        while (!edgeQueue.isEmpty()) {
+          FlowEdgeContext flowEdgeContext = edgeQueue.pop();
+
+          DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
+          DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+
+          //Are we done?
+          if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
+            return constructPath(flowEdgeContext);
+          }
+
+          //Expand the currentNode to its adjacent edges and add them to the queue.
+          List<FlowEdgeContext> nextEdges =
+              getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
+          for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
+            //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
+            // queue.
+            if (!this.pathMap.containsKey(childFlowEdgeContext)) {
+              edgeQueue.add(childFlowEdgeContext);
+              this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
+            }
+          }
+        }
+      //No path found. Return null.
+      return null;
+    } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) {
+      throw new PathFinder.PathFinderException(
+          "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + destNode.getId(), e);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java
new file mode 100644
index 0000000..982d7f6
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.pathfinder;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flow.FlowGraphPath;
+
+
+/**
+ * An interface for computing a path in a {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph}. Each
+ * implementation of {@link PathFinder} implements a specific path finding algorithm such as Breadth-First Search (BFS),
+ * Dijkstra's shortest-path algorithm etc.
+ */
+@Alpha
+public interface PathFinder {
+
+  public FlowGraphPath findPath() throws PathFinderException;
+
+  public static class PathFinderException extends Exception {
+    public PathFinderException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public PathFinderException(String message) {
+      super(message);
+    }
+  }
+
+}