You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/04 18:30:34 UTC

[gobblin] branch master updated: [GOBBLIN-1696] Implement file based flowgraph that detects changes to the underlying… (#3548)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a246e2386 [GOBBLIN-1696] Implement file based flowgraph that detects changes to the underlying… (#3548)
a246e2386 is described below

commit a246e238603bf3adbd6d30ab04242174779cca99
Author: William Lo <lo...@gmail.com>
AuthorDate: Tue Oct 4 11:30:26 2022 -0700

    [GOBBLIN-1696] Implement file based flowgraph that detects changes to the underlying… (#3548)
    
    * Implement file based flowgraph that detects changes to the underlying filesystem and updates dynamically
    
    * Fix checkstyle
    
    * Address review, use atomic swap instead of mutable shared state for flowgraph
    
    * Fix initialization to only load flowgraph after topology specstore is loaded and for better clarity on exceptions
    
    * Change baseflowgraphlistener to a helper class so that it is extensible to future changes in flowgraph format, and can be leveraged properly by test classes
    
    * Fix checkstyle
    
    * Address review and add metrics for flow update failures
    
    * Fix tests
    
    * Migrate git flowgraph to load fully on change rather than by diff, address review
    
    * Cleanup
    
    * Fix tests
---
 .../gobblin/configuration/ConfigurationKeys.java   |  10 +-
 .../apache/gobblin/metrics/ServiceMetricNames.java |   2 +
 .../service/modules/flow/MultiHopFlowCompiler.java |  42 +--
 .../service/modules/flowgraph/BaseFlowGraph.java   |  19 +-
 ...GraphListener.java => BaseFlowGraphHelper.java} | 184 ++++++------
 .../FSPathAlterationFlowGraphListener.java         |  96 +++++++
 .../service/modules/flowgraph/FlowGraph.java       |  17 +-
 .../modules/flowgraph/FlowGraphMonitor.java        |   1 -
 .../service/monitoring/FsFlowGraphMonitor.java     | 133 +++++++++
 .../service/monitoring/GitConfigMonitor.java       |   8 +-
 .../service/monitoring/GitFlowGraphListener.java   |  77 -----
 .../service/monitoring/GitFlowGraphMonitor.java    |  79 +++---
 .../modules/flow/MultiHopFlowCompilerTest.java     |  84 +-----
 .../service/monitoring/FsFlowGraphMonitorTest.java | 315 +++++++++++++++++++++
 .../monitoring/GitFlowGraphMonitorTest.java        |  41 +--
 .../util/callbacks/CallbacksDispatcher.java        |   2 +-
 ...ionCatchingPathAlterationListenerDecorator.java |  73 +++--
 .../util/filesystem/PathAlterationListener.java    |   6 +
 .../filesystem/PathAlterationListenerAdaptor.java  |   3 +
 .../util/filesystem/PathAlterationObserver.java    |  30 +-
 20 files changed, 838 insertions(+), 384 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 00817be63..cb5f664e8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1022,8 +1022,14 @@ public class ConfigurationKeys {
    * Configuration properties related to flowGraphs
    */
 
-  public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
-  public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
+  public static final String FLOWGRAPH_JAVA_PROPS_EXTENSIONS = "flowGraph.javaPropsExtensions";
+  public static final String FLOWGRAPH_HOCON_FILE_EXTENSIONS = "flowGraph.hoconFileExtensions";
+  public static final String DEFAULT_PROPERTIES_EXTENSIONS = "properties";
+  public static final String DEFAULT_CONF_EXTENSIONS = "conf";
+  public static final String FLOWGRAPH_POLLING_INTERVAL = "flowGraph.pollingInterval";
+  public static final String FLOWGRAPH_BASE_DIR = "flowGraph.configBaseDirectory";
+  public static final String FLOWGRAPH_ABSOLUTE_DIR = "flowGraph.absoluteDirectory";
+
 
   /***
    * Configuration properties related to TopologySpec Store
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index eb8ff896c..94e2a82c6 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -54,4 +54,6 @@ public class ServiceMetricNames {
   public static final String JOBS_SENT_TO_SPEC_EXECUTOR = "JobsSentToSpecExecutor";
 
   public static final String HELIX_LEADER_STATE = "HelixLeaderState";
+
+  public static final String FLOWGRAPH_UPDATE_FAILED_METER = GOBBLIN_SERVICE_PREFIX + ".FlowgraphUpdateFailed";
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index dc3d0bc47..1ffef3969 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
@@ -35,9 +36,10 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
 import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
 
-import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -52,7 +54,6 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
@@ -72,6 +73,7 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
 
 
 /***
@@ -81,8 +83,7 @@ import org.apache.gobblin.util.ConfigUtils;
 @Alpha
 @Slf4j
 public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
-  @Getter
-  private final FlowGraph flowGraph;
+  private AtomicReference<FlowGraph> flowGraph;
   @Getter
   private ServiceManager serviceManager;
   @Getter
@@ -111,6 +112,12 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     this(config, log, true);
   }
 
+  public MultiHopFlowCompiler(Config config, AtomicReference<FlowGraph> flowGraph) {
+    super(config, Optional.absent(), true);
+    this.flowGraph = flowGraph;
+    this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
+  }
+
   public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
     super(config, log, instrumentationEnabled);
     try {
@@ -120,8 +127,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     } catch (RuntimeException e) {
       MultiHopFlowCompiler.log.warn("Exception reading data node alias map, ignoring it.", e);
     }
-
-    this.flowGraph = new BaseFlowGraph(dataNodeAliasMap);
+    // Use atomic reference to avoid partial flowgraph upgrades during path compilation.
+    this.flowGraph = new AtomicReference<>(new BaseFlowGraph(dataNodeAliasMap));
 
     Optional<ObservingFSFlowEdgeTemplateCatalog> flowTemplateCatalog = Optional.absent();
     if (config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
@@ -152,7 +159,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     try {
       String flowGraphMonitorClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY, GitFlowGraphMonitor.class.getCanonicalName());
       this.flowGraphMonitor = (FlowGraphMonitor) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(FlowGraphMonitor.class).resolve(
-        flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this.flowGraph, this.topologySpecMap, this.getInitComplete());
+        flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), instrumentationEnabled);
     } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
       throw new RuntimeException(e);
     }
@@ -167,13 +174,6 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     }
   }
 
-  @VisibleForTesting
-  MultiHopFlowCompiler(Config config, FlowGraph flowGraph) {
-    super(config, Optional.absent(), true);
-    this.flowGraph = flowGraph;
-    this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
-  }
-
   /**
    * Mark the {@link SpecCompiler} as active. This in turn activates the {@link GitFlowGraphMonitor}, allowing to start polling
    * and processing changes
@@ -207,20 +207,20 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
   public Dag<JobExecutionPlan> compileFlow(Spec spec) {
     Preconditions.checkNotNull(spec);
     Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowCompiler only accepts FlowSpecs");
-
+    FlowGraph graph = this.flowGraph.get();
     long startTime = System.nanoTime();
 
     FlowSpec flowSpec = (FlowSpec) spec;
     String source = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, this.dataNodeAliasMap);
     String destination = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
 
-    DataNode sourceNode = this.flowGraph.getNode(source);
+    DataNode sourceNode = graph.getNode(source);
     if (sourceNode == null) {
       flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", source));
       return null;
     }
     List<String> destNodeIds = FlowConfigUtils.getDataNodes(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
-    List<DataNode> destNodes = destNodeIds.stream().map(this.flowGraph::getNode).collect(Collectors.toList());
+    List<DataNode> destNodes = destNodeIds.stream().map(graph::getNode).collect(Collectors.toList());
     if (destNodes.contains(null)) {
       flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", destNodeIds.get(destNodes.indexOf(null))));
       return null;
@@ -252,7 +252,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
         }
 
         //Compute the path from source to destination.
-        FlowGraphPath flowGraphPath = flowGraph.findPath(datasetFlowSpec);
+        FlowGraphPath flowGraphPath = graph.findPath(datasetFlowSpec);
         if (flowGraphPath != null) {
           //Convert the path into a Dag of JobExecutionPlans.
           jobExecutionPlanDag = jobExecutionPlanDag.merge(flowGraphPath.asDag(this.config));
@@ -290,6 +290,10 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     return jobExecutionPlanDag;
   }
 
+  public void setFlowGraph(FlowGraph flowGraph) {
+    this.flowGraph.set(flowGraph);
+  }
+
   /**
    * If {@link FlowSpec} has {@link ConfigurationKeys#DATASET_SUBPATHS_KEY}, split it into multiple flowSpecs using a
    * provided base input and base output path to generate multiple source/destination paths.
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index 9f5ef0292..e54fd7cc0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -45,12 +45,13 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 @Alpha
 @Slf4j
 public class BaseFlowGraph implements FlowGraph {
+  // Synchronize read/write access while the flowgraph is in the middle of an update
   private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
-  private Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>();
-  private Map<String, DataNode> dataNodeMap = new HashMap<>();
-  private Map<String, FlowEdge> flowEdgeMap = new HashMap<>();
-  private Map<String, String> dataNodeAliasMap;
+  private final Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>();
+  private final Map<String, DataNode> dataNodeMap = new HashMap<>();
+  private final Map<String, FlowEdge> flowEdgeMap = new HashMap<>();
+  private final Map<String, String> dataNodeAliasMap;
 
   public BaseFlowGraph() {
     this(new HashMap<>());
@@ -239,15 +240,17 @@ public class BaseFlowGraph implements FlowGraph {
 
   /**{@inheritDoc}**/
   @Override
-  public FlowGraphPath findPath(FlowSpec flowSpec) throws PathFinder.PathFinderException, ReflectiveOperationException {
+  public FlowGraphPath findPath(FlowSpec flowSpec)
+      throws PathFinder.PathFinderException, ReflectiveOperationException {
     try {
       rwLock.readLock().lock();
       //Instantiate a PathFinder.
-      Class pathFinderClass = Class.forName(ConfigUtils
-          .getString(flowSpec.getConfig(), FlowGraphConfigurationKeys.FLOW_GRAPH_PATH_FINDER_CLASS,
+      Class pathFinderClass = Class.forName(
+          ConfigUtils.getString(flowSpec.getConfig(), FlowGraphConfigurationKeys.FLOW_GRAPH_PATH_FINDER_CLASS,
               FlowGraphConfigurationKeys.DEFAULT_FLOW_GRAPH_PATH_FINDER_CLASS));
       PathFinder pathFinder =
-          (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec, dataNodeAliasMap);
+          (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec,
+              dataNodeAliasMap);
       return pathFinder.findPath();
     } finally {
       rwLock.readLock().unlock();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
similarity index 63%
rename from gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java
rename to gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
index fa249c4cd..4f0b82779 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -28,7 +29,6 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import lombok.extern.slf4j.Slf4j;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
@@ -38,7 +38,13 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
@@ -47,55 +53,60 @@ import org.apache.gobblin.util.PullFileLoader;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
-
 /**
- * Provides the common set of functionalities needed by listeners of {@link FlowGraphMonitor} to read changes in files and
+ * Provides the common set of functionalities needed by {@link FlowGraphMonitor} to read changes in files and
  * apply them to a {@link FlowGraph}
  * Assumes that the directory structure between flowgraphs configuration files are the same.
  */
 @Slf4j
-public abstract class BaseFlowGraphListener {
-  protected FlowGraph flowGraph;
-  protected static final int NODE_FILE_DEPTH = 3;
-  protected static final int EDGE_FILE_DEPTH = 4;
+public class BaseFlowGraphHelper {
+  public static final int NODE_FILE_DEPTH = 3;
+  public static final int EDGE_FILE_DEPTH = 4;
   private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
 
   final String baseDirectory;
   private final Config emptyConfig = ConfigFactory.empty();
 
-  private Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
+  private final Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
   private final Map<URI, TopologySpec> topologySpecMap;
-
+  private MetricContext metricContext;
   final String flowGraphFolderName;
   final PullFileLoader pullFileLoader;
   final Set<String> javaPropsExtensions;
   final Set<String> hoconFileExtensions;
+  private final Optional<ContextAwareMeter> flowGraphUpdateFailedMeter;
 
-  public BaseFlowGraphListener(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
-      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String flowGraphFolderName,
-      String javaPropsExtentions, String hoconFileExtensions) {
+  public BaseFlowGraphHelper(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
+      Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String flowGraphFolderName,
+      String javaPropsExtentions, String hoconFileExtensions, boolean instrumentationEnabled, Config config) {
     this.flowTemplateCatalog = flowTemplateCatalog;
-    this.flowGraph = graph;
     this.topologySpecMap = topologySpecMap;
     this.baseDirectory = baseDirectory;
     this.flowGraphFolderName = flowGraphFolderName;
     Path folderPath = new Path(baseDirectory, this.flowGraphFolderName);
     this.javaPropsExtensions = Sets.newHashSet(javaPropsExtentions.split(","));
     this.hoconFileExtensions = Sets.newHashSet(hoconFileExtensions.split(","));
+    if (instrumentationEnabled) {
+      this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), BaseFlowGraphHelper.class);
+      this.flowGraphUpdateFailedMeter = Optional.of(this.metricContext.contextAwareMeter(ServiceMetricNames.FLOWGRAPH_UPDATE_FAILED_METER));
+    } else {
+      this.flowGraphUpdateFailedMeter = Optional.absent();
+    }
     try {
       this.pullFileLoader = new PullFileLoader(folderPath,
-          FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()),
-          this.javaPropsExtensions, this.hoconFileExtensions);
+          FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()), this.javaPropsExtensions,
+          this.hoconFileExtensions);
     } catch (IOException e) {
       throw new RuntimeException("Could not create pull file loader", e);
     }
   }
+
   /**
    * Add a {@link DataNode} to the {@link FlowGraph}. The method uses the {@link FlowGraphConfigurationKeys#DATA_NODE_CLASS} config
    * to instantiate a {@link DataNode} from the node config file.
    * @param path of node to add
    */
-  protected void addDataNode(String path) {
+  protected void addDataNode(FlowGraph graph, String path) {
     if (checkFilePath(path, NODE_FILE_DEPTH)) {
       Path nodeFilePath = new Path(this.baseDirectory, path);
       try {
@@ -103,52 +114,39 @@ public abstract class BaseFlowGraphListener {
         Class dataNodeClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.DATA_NODE_CLASS,
             FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
         DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, config);
-        if (!this.flowGraph.addDataNode(dataNode)) {
+        if (!graph.addDataNode(dataNode)) {
           log.warn("Could not add DataNode {} to FlowGraph; skipping", dataNode.getId());
         } else {
           log.info("Added Datanode {} to FlowGraph", dataNode.getId());
         }
       } catch (Exception e) {
+        if (this.flowGraphUpdateFailedMeter.isPresent()) {
+          this.flowGraphUpdateFailedMeter.get().mark();
+        }
         log.warn("Could not add DataNode defined in {} due to exception {}", path, e);
       }
     }
   }
 
-  /**
-   * Remove a {@link DataNode} from the {@link FlowGraph}. The method extracts the nodeId of the
-   * {@link DataNode} from the node config file and uses it to delete the associated {@link DataNode}.
-   * @param path of node to delete
-   */
-  protected void removeDataNode(String path) {
-    if (checkFilePath(path, NODE_FILE_DEPTH)) {
-      Path nodeFilePath = new Path(this.baseDirectory, path);
-      Config config = getNodeConfigWithOverrides(ConfigFactory.empty(), nodeFilePath);
-      String nodeId = config.getString(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY);
-      if (!this.flowGraph.deleteDataNode(nodeId)) {
-        log.warn("Could not remove DataNode {} from FlowGraph; skipping", nodeId);
-      } else {
-        log.info("Removed DataNode {} from FlowGraph", nodeId);
-      }
-    }
-  }
-
   /**
    * Add a {@link FlowEdge} to the {@link FlowGraph}. The method uses the {@link FlowEdgeFactory} instance
    * provided by the {@link FlowGraph} to build a {@link FlowEdge} from the edge config file.
    * @param path of edge to add
    */
-  protected void addFlowEdge(String path) {
+  protected void addFlowEdge(FlowGraph graph, String path) {
     if (checkFilePath(path, EDGE_FILE_DEPTH)) {
       Path edgeFilePath = new Path(this.baseDirectory, path);
       try {
         Config edgeConfig = loadEdgeFileWithOverrides(edgeFilePath);
         List<SpecExecutor> specExecutors = getSpecExecutors(edgeConfig);
-        Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
-            FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
-        FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, edgeConfig);
+        Class flowEdgeFactoryClass = Class.forName(
+            ConfigUtils.getString(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+                FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
+        FlowEdgeFactory flowEdgeFactory =
+            (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, edgeConfig);
         if (flowTemplateCatalog.isPresent()) {
           FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowTemplateCatalog.get(), specExecutors);
-          if (!this.flowGraph.addFlowEdge(edge)) {
+          if (!graph.addFlowEdge(edge)) {
             log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId());
           } else {
             log.info("Added edge {} to FlowGraph", edge.getId());
@@ -157,37 +155,17 @@ public abstract class BaseFlowGraphListener {
           log.warn("Could not add edge defined in {} to FlowGraph as FlowTemplateCatalog is absent", path);
         }
       } catch (Exception e) {
-        log.warn("Could not add edge defined in {} due to exception {}", path, e.getMessage());
-      }
-    }
-  }
-
-  /**
-   * Remove a {@link FlowEdge} from the {@link FlowGraph}. The method uses {@link FlowEdgeFactory}
-   * to construct the edgeId of the {@link FlowEdge} from the config file and uses it to delete the associated
-   * {@link FlowEdge}.
-   * @param path of edge to delete
-   */
-  protected void removeFlowEdge(String path) {
-    if (checkFilePath(path, EDGE_FILE_DEPTH)) {
-      Path edgeFilePath = new Path(this.baseDirectory, path);
-      try {
-        Config config = getEdgeConfigWithOverrides(ConfigFactory.empty(), edgeFilePath);
-        String edgeId = config.getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY);
-        if (!this.flowGraph.deleteFlowEdge(edgeId)) {
-          log.warn("Could not remove edge {} from FlowGraph; skipping", edgeId);
-        } else {
-          log.info("Removed edge {} from FlowGraph", edgeId);
+        log.warn("Could not add edge defined in {} due to exception", path, e);
+        if (this.flowGraphUpdateFailedMeter.isPresent()) {
+          this.flowGraphUpdateFailedMeter.get().mark();
         }
-      } catch (Exception e) {
-        log.warn("Could not remove edge defined in {} due to exception {}", edgeFilePath, e.getMessage());
       }
     }
   }
 
   /**
-   * check whether the file has the proper naming and hierarchy
-   * @param file the relative path from the repo root
+   * check whether the file has the proper naming and hierarchy for nodes and edges
+   * @param file the relative path from the root of the flowgraph
    * @return false if the file does not conform
    */
   private boolean checkFilePath(String file, int depth) {
@@ -197,10 +175,9 @@ public abstract class BaseFlowGraphListener {
 
     Path filePath = new Path(file);
     String fileExtension = Files.getFileExtension(filePath.getName());
-    if (filePath.depth() != depth || !checkFileLevelRelativeToRoot(filePath, depth)
-        || !(this.javaPropsExtensions.contains(fileExtension) || this.hoconFileExtensions.contains(fileExtension))) {
-      log.warn("Changed file does not conform to directory structure and file name format, skipping: "
-          + filePath);
+    if (!checkFileLevelRelativeToRoot(filePath, depth) || !(this.javaPropsExtensions.contains(fileExtension)
+        || this.hoconFileExtensions.contains(fileExtension))) {
+      log.warn("Changed file does not conform to directory structure and file name format, skipping: " + filePath);
       return false;
     }
     return true;
@@ -212,7 +189,7 @@ public abstract class BaseFlowGraphListener {
    * @param depth expected depth of the file
    * @return true if the file conforms to the expected hierarchy
    */
-  private boolean checkFileLevelRelativeToRoot(Path filePath, int depth) {
+  public boolean checkFileLevelRelativeToRoot(Path filePath, int depth) {
     if (filePath == null) {
       return false;
     }
@@ -220,10 +197,7 @@ public abstract class BaseFlowGraphListener {
     for (int i = 0; i < depth - 1; i++) {
       path = path.getParent();
     }
-    if (!path.getName().equals(flowGraphFolderName)) {
-      return false;
-    }
-    return true;
+    return path.getName().equals(flowGraphFolderName);
   }
 
   /**
@@ -243,14 +217,15 @@ public abstract class BaseFlowGraphListener {
    * @param edgeFilePath path of the edge file
    * @return config with overridden edge properties
    */
-  private Config getEdgeConfigWithOverrides(Config edgeConfig, Path edgeFilePath) {
+   protected Config getEdgeConfigWithOverrides(Config edgeConfig, Path edgeFilePath) {
     String source = edgeFilePath.getParent().getParent().getName();
     String destination = edgeFilePath.getParent().getName();
     String edgeName = Files.getNameWithoutExtension(edgeFilePath.getName());
 
     return edgeConfig.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ConfigValueFactory.fromAnyRef(source))
         .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ConfigValueFactory.fromAnyRef(destination))
-        .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(getEdgeId(source, destination, edgeName)));
+        .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
+            ConfigValueFactory.fromAnyRef(getEdgeId(source, destination, edgeName)));
   }
 
   /**
@@ -259,13 +234,18 @@ public abstract class BaseFlowGraphListener {
    * @param edgeConfig containing the logical names of SpecExecutors for this edge.
    * @return a {@link List<SpecExecutor>}s for this edge.
    */
-  private List<SpecExecutor> getSpecExecutors(Config edgeConfig) throws URISyntaxException {
+  private List<SpecExecutor> getSpecExecutors(Config edgeConfig)
+      throws URISyntaxException, IOException {
     //Get the logical names of SpecExecutors where the FlowEdge can be executed.
-    List<String> specExecutorNames = ConfigUtils.getStringList(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
+    List<String> specExecutorNames =
+        ConfigUtils.getStringList(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
     //Load all the SpecExecutor configurations for this FlowEdge from the SpecExecutor Catalog.
     List<SpecExecutor> specExecutors = new ArrayList<>(specExecutorNames.size());
-    for (String specExecutorName: specExecutorNames) {
+    for (String specExecutorName : specExecutorNames) {
       URI specExecutorUri = new URI(specExecutorName);
+      if (!this.topologySpecMap.containsKey(specExecutorUri)) {
+        throw new IOException(String.format("Spec executor %s does not exist in the topologySpecStore.", specExecutorUri));
+      }
       specExecutors.add(this.topologySpecMap.get(specExecutorUri).getSpecExecutor());
     }
     return specExecutors;
@@ -277,23 +257,60 @@ public abstract class BaseFlowGraphListener {
    * @return the configuration object
    * @throws IOException
    */
-  protected Config loadNodeFileWithOverrides(Path filePath) throws IOException {
+  protected Config loadNodeFileWithOverrides(Path filePath)
+      throws IOException {
     Config nodeConfig = this.pullFileLoader.loadPullFile(filePath, emptyConfig, false, false);
     return getNodeConfigWithOverrides(nodeConfig, filePath);
   }
 
-
   /**
    * Load the edge file.
    * @param filePath path of the edge file relative to the repository root
    * @return the configuration object
    * @throws IOException
    */
-  protected Config loadEdgeFileWithOverrides(Path filePath) throws IOException {
+  protected Config loadEdgeFileWithOverrides(Path filePath)
+      throws IOException {
     Config edgeConfig = this.pullFileLoader.loadPullFile(filePath, emptyConfig, false, false);
     return getEdgeConfigWithOverrides(edgeConfig, filePath);
   }
 
+  /**
+   * Loads the entire flowgraph from the path configured in {@link org.apache.gobblin.configuration.ConfigurationKeys.FLOWGRAPH_BASE_DIR }
+   * Expects nodes to be in the format of /flowGraphName/nodeA/nodeA.properties
+   * Expects edges to be in the format of /flowGraphName/nodeA/nodeB/edgeAB.properties
+   * The current flowgraph will be swapped atomically with the new flowgraph that is loaded
+   */
+  public FlowGraph generateFlowGraph() {
+    FlowGraph newFlowGraph = new BaseFlowGraph();
+    java.nio.file.Path graphPath = new File(this.baseDirectory).toPath();
+    try {
+      List<Path> edges = new ArrayList<>();
+      // All nodes must be added first before edges, otherwise edges may have a missing source or destination.
+      // Need to convert files to Hadoop Paths to be compatible with FileAlterationListener
+      java.nio.file.Files.walk(graphPath).forEach(fileName -> {
+        if (!java.nio.file.Files.isDirectory(fileName)) {
+          if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), NODE_FILE_DEPTH)) {
+            addDataNode(newFlowGraph, fileName.toString());
+          } else if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), EDGE_FILE_DEPTH)) {
+            edges.add(new Path(fileName.toString()));
+          }
+        }
+      });
+      for (Path edge : edges) {
+        addFlowEdge(newFlowGraph, edge.toString());
+      }
+      return newFlowGraph;
+    } catch (IOException e) {
+      // Log and report error, but do not break or crash the flowgraph so that currently running flows can continue
+      if (this.flowGraphUpdateFailedMeter.isPresent()) {
+        this.flowGraphUpdateFailedMeter.get().mark();
+      }
+      log.error(String.format("Error while populating file based flowgraph at path %s", graphPath), e);
+      return null;
+    }
+  }
+
   /**
    * Get an edge label from the edge properties
    * @param source source data node id
@@ -301,8 +318,7 @@ public abstract class BaseFlowGraphListener {
    * @param edgeName simple name of the edge (e.g. file name without extension of the edge file)
    * @return a string label identifying the edge
    */
-  private String getEdgeId(String source, String destination, String edgeName) {
+  public String getEdgeId(String source, String destination, String edgeName) {
     return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(source, destination, edgeName);
   }
-
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
new file mode 100644
index 000000000..1700d8805
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+
+
+/**
+ * An implementation of {@link PathAlterationListener} to listen for changes in a directory and apply it to a GaaS FlowGraph
+ * Is invoked by {@link PathAlterationObserver} which would check a folder and perform recursive comparisons on files compared to
+ * their last polled state. On any detected differences in files when a check is done, the {@link FlowGraph} will be updated.
+ *
+ */
+@Slf4j
+public class FSPathAlterationFlowGraphListener implements PathAlterationListener {
+  private final MultiHopFlowCompiler compiler;
+  private final BaseFlowGraphHelper flowGraphHelper;
+
+  public FSPathAlterationFlowGraphListener(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
+      MultiHopFlowCompiler compiler, String baseDirectory, BaseFlowGraphHelper flowGraphHelper) {
+    this.flowGraphHelper = flowGraphHelper;
+    File graphDir = new File(baseDirectory);
+    // Populate the flowgraph with any existing files
+    if (!graphDir.exists()) {
+      throw new RuntimeException(String.format("Flowgraph directory at path %s does not exist!", graphDir));
+    }
+    this.compiler = compiler;
+  }
+
+  @Override
+  public void onStart(final PathAlterationObserver observer) {
+  }
+
+  @Override
+  public void onFileCreate(final Path path) {
+  }
+
+  @Override
+  public void onFileChange(final Path path) {
+  }
+
+  @Override
+  public void onStop(final PathAlterationObserver observer) {
+  }
+
+  @Override
+  public void onDirectoryCreate(final Path directory) {
+  }
+
+  @Override
+  public void onDirectoryChange(final Path directory) {
+  }
+
+  @Override
+  public void onDirectoryDelete(final Path directory) {
+  }
+
+  @Override
+  public void onFileDelete(final Path path) {
+  }
+
+  @Override
+  public void onCheckDetectedChange() {
+    log.info("Detecting change in flowgraph files, reloading flowgraph");
+    FlowGraph newGraph = this.flowGraphHelper.generateFlowGraph();
+    if (newGraph != null) {
+      this.compiler.setFlowGraph(newGraph);
+    }
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
index 47af72677..2f0a81264 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
@@ -39,35 +39,35 @@ public interface FlowGraph {
    * @param nodeId {@link DataNode} identifier.
    * @return the {@link DataNode} object if the node is present in the {@link FlowGraph}.
    */
-  public DataNode getNode(String nodeId);
+  DataNode getNode(String nodeId);
 
   /**
    * Add a {@link DataNode} to the {@link FlowGraph}
    * @param node {@link DataNode} to be added
    * @return true if {@link DataNode} is added to the {@link FlowGraph} successfully.
    */
-  public boolean addDataNode(DataNode node);
+  boolean addDataNode(DataNode node);
 
   /**
    * Add a {@link FlowEdge} to the {@link FlowGraph}
    * @param edge {@link FlowEdge} to be added
    * @return true if {@link FlowEdge} is added to the {@link FlowGraph} successfully.
    */
-  public boolean addFlowEdge(FlowEdge edge);
+  boolean addFlowEdge(FlowEdge edge);
 
   /**
    * Remove a {@link DataNode} and all its incident edges from the {@link FlowGraph}
    * @param nodeId identifier of the {@link DataNode} to be removed
    * @return true if {@link DataNode} is removed from the {@link FlowGraph} successfully.
    */
-  public boolean deleteDataNode(String nodeId);
+  boolean deleteDataNode(String nodeId);
 
   /**
    * Remove a {@link FlowEdge} from the {@link FlowGraph}
    * @param edgeId label of the edge to be removed
    * @return true if edge is removed from the {@link FlowGraph} successfully.
    */
-  public boolean deleteFlowEdge(String edgeId);
+  boolean deleteFlowEdge(String edgeId);
 
   /**
    * Get a collection of edges adjacent to a {@link DataNode}. Useful for path finding algorithms and graph
@@ -75,14 +75,14 @@ public interface FlowGraph {
    * @param nodeId identifier of the {@link DataNode}
    * @return a collection of edges adjacent to the {@link DataNode}
    */
-  public Collection<FlowEdge> getEdges(String nodeId);
+  Collection<FlowEdge> getEdges(String nodeId);
 
   /**
    * Get a collection of edges adjacent to a {@link DataNode}.
    * @param node {@link DataNode}
    * @return a collection of edges adjacent to the {@link DataNode}
    */
-  public Collection<FlowEdge> getEdges(DataNode node);
+  Collection<FlowEdge> getEdges(DataNode node);
 
   /**
    * A method that takes a {@link FlowSpec} containing the source and destination {@link DataNode}s, as well as the
@@ -95,5 +95,6 @@ public interface FlowGraph {
    * @return an instance of {@link FlowGraphPath} that encapsulates a sequence of {@link org.apache.gobblin.runtime.api.JobSpec}s
    * satisfying flowSpec.
    */
-  public FlowGraphPath findPath(FlowSpec flowSpec) throws PathFinder.PathFinderException, ReflectiveOperationException;
+  FlowGraphPath findPath(FlowSpec flowSpec)
+      throws PathFinder.PathFinderException, ReflectiveOperationException;
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
index 7c39b0157..bdf551172 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
@@ -30,5 +30,4 @@ public interface FlowGraphMonitor extends Service {
    * @param value whether GaaS is ready
    */
   void setActive(boolean value);
-
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
new file mode 100644
index 000000000..6183e7d66
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
+import org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
+
+
+@Slf4j
+public class FsFlowGraphMonitor extends AbstractIdleService implements FlowGraphMonitor {
+  public static final String FS_FLOWGRAPH_MONITOR_PREFIX = "gobblin.service.fsFlowGraphMonitor";
+  private static final long DEFAULT_FLOWGRAPH_POLLING_INTERVAL = 60;
+  private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_ABSOLUTE_DIR = "/tmp/fsFlowgraph";
+  private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = "gobblin-flowgraph";
+  private volatile boolean isActive = false;
+  private final long pollingInterval;
+  private BaseFlowGraphHelper flowGraphHelper;
+  private final PathAlterationObserverScheduler pathAlterationDetector;
+  private final FSPathAlterationFlowGraphListener listener;
+  private final PathAlterationObserver observer;
+  private final Path flowGraphPath;
+  private final MultiHopFlowCompiler compiler;
+  private final CountDownLatch initComplete;
+  private static final Config DEFAULT_FALLBACK = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+      .put(ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR, DEFAULT_FS_FLOWGRAPH_MONITOR_ABSOLUTE_DIR)
+      .put(ConfigurationKeys.FLOWGRAPH_BASE_DIR, DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+      .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, DEFAULT_FLOWGRAPH_POLLING_INTERVAL)
+      .put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS, ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+      .put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS, ConfigurationKeys.DEFAULT_CONF_EXTENSIONS).build());
+
+  public FsFlowGraphMonitor(Config config, Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
+      MultiHopFlowCompiler compiler, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete, boolean instrumentationEnabled)
+      throws IOException {
+    Config configWithFallbacks = config.getConfig(FS_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+    this.pollingInterval =
+        TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
+    this.flowGraphPath = new Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR));
+    this.observer = new PathAlterationObserver(flowGraphPath);
+    this.flowGraphHelper = new BaseFlowGraphHelper(flowTemplateCatalog, topologySpecMap, flowGraphPath.toString(),
+        configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS),
+        configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS), instrumentationEnabled, config);
+    this.listener = new FSPathAlterationFlowGraphListener(flowTemplateCatalog, compiler, flowGraphPath.toString(), this.flowGraphHelper);
+
+   this.compiler = compiler;
+   this.initComplete = initComplete;
+
+    if (pollingInterval == ConfigurationKeys.DISABLED_JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL) {
+      this.pathAlterationDetector = null;
+    } else {
+      this.pathAlterationDetector = new PathAlterationObserverScheduler(pollingInterval);
+      Optional<PathAlterationObserver> observerOptional = Optional.fromNullable(observer);
+      this.pathAlterationDetector.addPathAlterationObserver(this.listener, observerOptional,
+          this.flowGraphPath);
+    }
+  }
+
+  @Override
+  protected void startUp()
+      throws IOException {
+  }
+
+  @Override
+  public synchronized void setActive(boolean isActive) {
+    log.info("Setting the flow graph monitor to be " + isActive + " from " + this.isActive);
+    if (this.isActive == isActive) {
+      // No-op if already in correct state
+      return;
+    } else if (isActive) {
+      if (this.pathAlterationDetector != null) {
+        log.info("Starting the " + getClass().getSimpleName());
+        log.info("Polling flowgraph folder with interval {} ", this.pollingInterval);
+        try {
+          this.pathAlterationDetector.start();
+          // Manually instantiate flowgraph when the monitor becomes active
+          this.compiler.setFlowGraph(this.flowGraphHelper.generateFlowGraph());
+          // Reduce the countdown latch
+          this.initComplete.countDown();
+          log.info("Finished populating FSFlowgraph");
+        } catch (IOException e) {
+          log.error("Could not initialize pathAlterationDetector due to error: ", e);
+        }
+      } else {
+        log.warn("No path alteration detector found");
+      }
+    }
+    this.isActive = isActive;
+  }
+
+  /** Stop the service. */
+  @Override
+  protected void shutDown()
+      throws Exception {
+    this.pathAlterationDetector.stop();
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
index e7f639ce8..0d47e536d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
@@ -54,8 +54,8 @@ public class GitConfigMonitor extends GitMonitoringService {
           .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR)
           .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME)
           .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL)
-          .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
-          .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
+          .put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
+          .put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
           .build());
 
   private final FlowCatalog flowCatalog;
@@ -66,8 +66,8 @@ public class GitConfigMonitor extends GitMonitoringService {
     this.flowCatalog = flowCatalog;
     Config configWithFallbacks = config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
     this.listeners.add(new GitConfigListener(flowCatalog, configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR),
-        configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.JAVA_PROPS_EXTENSIONS),
-        configWithFallbacks.getString(ConfigurationKeys.HOCON_FILE_EXTENSIONS)));
+        configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS),
+        configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS)));
   }
 
   @Override
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java
deleted file mode 100644
index 1f5a16e7d..000000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.monitoring;
-
-import java.net.URI;
-import java.util.Map;
-
-import org.eclipse.jgit.diff.DiffEntry;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Optional;
-
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphListener;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
-
-
-/**
- * Listener for {@link GitFlowGraphMonitor} to apply changes from Git to a {@link FlowGraph}
- */
-public class GitFlowGraphListener extends BaseFlowGraphListener implements GitDiffListener {
-
-  public GitFlowGraphListener(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
-      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String folderName, String javaPropsExtentions,
-      String hoconFileExtentions) {
-    super(flowTemplateCatalog, graph, topologySpecMap, baseDirectory, folderName, javaPropsExtentions, hoconFileExtentions);
-
-  }
-
-  /**
-   * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
-   * the {@link FlowGraph} for an added, updated or modified node or edge file.
-   * @param change
-   */
-  @Override
-  public void addChange(DiffEntry change) {
-    Path path = new Path(change.getNewPath());
-    if (path.depth() == NODE_FILE_DEPTH) {
-      addDataNode(change.getNewPath());
-    } else if (path.depth() == EDGE_FILE_DEPTH) {
-      addFlowEdge(change.getNewPath());
-    }
-  }
-
-  /**
-   * Remove an element (i.e. either a {@link DataNode} or a {@link FlowEdge} from the {@link FlowGraph} for
-   * a renamed or deleted {@link DataNode} or {@link FlowEdge} file.
-   * @param change
-   */
-  @Override
-  public void removeChange(DiffEntry change) {
-    Path path = new Path(change.getOldPath());
-    if (path.depth() == NODE_FILE_DEPTH) {
-      removeDataNode(change.getOldPath());
-    } else if (path.depth() == EDGE_FILE_DEPTH) {
-      removeFlowEdge(change.getOldPath());
-    }
-  }
-}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
index bfe811fa0..684ab2c90 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
@@ -18,14 +18,11 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.URI;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.hadoop.fs.Path;
 import org.eclipse.jgit.api.errors.GitAPIException;
 import org.eclipse.jgit.diff.DiffEntry;
 
@@ -33,16 +30,20 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 
+
 /**
  * Service that monitors for changes to {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph} from a git repository.
  * The git repository must have an inital commit that has no files since that is used as a base for getting
@@ -54,8 +55,6 @@ import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog
 @Slf4j
 public class GitFlowGraphMonitor extends GitMonitoringService implements FlowGraphMonitor {
   public static final String GIT_FLOWGRAPH_MONITOR_PREFIX = "gobblin.service.gitFlowGraphMonitor";
-  private static final String PROPERTIES_EXTENSIONS = "properties";
-  private static final String CONF_EXTENSIONS = "conf";
   private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR = "git-flowgraph";
   private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = "gobblin-flowgraph";
   private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME = "master";
@@ -67,26 +66,26 @@ public class GitFlowGraphMonitor extends GitMonitoringService implements FlowGra
       .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
       .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
       .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
-      .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
-      .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
-      .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
-      .put(SHOULD_CHECKPOINT_HASHES, false)
-      .build());
+      .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
+      .put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS, ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+      .put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS, ConfigurationKeys.DEFAULT_CONF_EXTENSIONS)
+      .put(SHOULD_CHECKPOINT_HASHES, false).build());
 
-  private Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
+  private final Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
   private final CountDownLatch initComplete;
+  private final BaseFlowGraphHelper flowGraphHelper;
+  private final MultiHopFlowCompiler multihopFlowCompiler;
 
-  public GitFlowGraphMonitor(Config config, Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
-      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
+  public GitFlowGraphMonitor(Config config, Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog, MultiHopFlowCompiler compiler
+      , Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete, boolean instrumentationEnabled) {
     super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
     Config configWithFallbacks = config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
     this.flowTemplateCatalog = flowTemplateCatalog;
     this.initComplete = initComplete;
-    this.listeners.add(new GitFlowGraphListener(
-        flowTemplateCatalog, graph, topologySpecMap, configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR),
-        configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.JAVA_PROPS_EXTENSIONS),
-        configWithFallbacks.getString(ConfigurationKeys.HOCON_FILE_EXTENSIONS))
-    );
+    this.flowGraphHelper = new BaseFlowGraphHelper(flowTemplateCatalog, topologySpecMap, configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR),
+        configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS),
+        configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS), instrumentationEnabled, config);
+    this.multihopFlowCompiler = compiler;
   }
 
   /**
@@ -99,39 +98,27 @@ public class GitFlowGraphMonitor extends GitMonitoringService implements FlowGra
   }
 
   /**
-   * Sort the changes in a commit so that changes to node files appear before changes to edge files. This is done so that
-   * node related changes are applied to the FlowGraph before edge related changes. An example where the order matters
-   * is the case when a commit adds a new node n2 as well as adds an edge from an existing node n1 to n2. To ensure that the
-   * addition of edge n1->n2 is successful, node n2 must exist in the graph and so needs to be added first. For deletions,
-   * the order does not matter and ordering the changes in the commit will result in the same FlowGraph state as if the changes
-   * were unordered. In other words, deletion of a node deletes all its incident edges from the FlowGraph. So processing an
-   * edge deletion later results in a no-op. Note that node and edge files do not change depth in case of modifications.
-   *
-   * If there are multiple commits between successive polls to Git, the re-ordering of changes across commits should not
-   * affect the final state of the FlowGraph. This is because, the order of changes for a given file type (i.e. node or edge)
-   * is preserved.
+   * Reprocesses the entire flowgraph from the root folder every time a change in git is detected
    */
   @Override
-  void processGitConfigChanges() throws GitAPIException, IOException {
+  void processGitConfigChanges()
+      throws GitAPIException, IOException {
+    // Pulls repository to latest and grabs changes
+    List<DiffEntry> changes = this.gitRepo.getChanges();
     if (flowTemplateCatalog.isPresent() && flowTemplateCatalog.get().getAndSetShouldRefreshFlowGraph(false)) {
       log.info("Change to template catalog detected, refreshing FlowGraph");
       this.gitRepo.initRepository();
+    } else if (changes.isEmpty()) {
+      return;
     }
+    log.info("Detected changes in flowGraph, refreshing Flowgraph");
 
-    List<DiffEntry> changes = this.gitRepo.getChanges();
-    changes.sort(new GitFlowgraphComparator());
-    processGitConfigChangesHelper(changes);
-    //Decrements the latch count. The countdown latch is initialized to 1. So after the first time the latch is decremented,
-    // the following operation should be a no-op.
-    this.initComplete.countDown();
-  }
-
-  static class GitFlowgraphComparator implements Comparator<DiffEntry>, Serializable {
-    public int compare(DiffEntry o1, DiffEntry o2) {
-      Integer o1Depth = (o1.getNewPath() != null) ? (new Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth();
-      Integer o2Depth = (o2.getNewPath() != null) ? (new Path(o2.getNewPath())).depth() : (new Path(o2.getOldPath())).depth();
-      return o1Depth.compareTo(o2Depth);
+    FlowGraph newGraph = this.flowGraphHelper.generateFlowGraph();
+    if (newGraph != null) {
+      this.multihopFlowCompiler.setFlowGraph(newGraph);
     }
+    // Noop if flowgraph is already initialized
+    this.initComplete.countDown();
+    this.gitRepo.moveCheckpointAndHashesForward();
   }
-
 }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 0f5a66753..4c3d4f137 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -33,7 +33,9 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -41,12 +43,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.eclipse.jgit.api.Git;
-import org.eclipse.jgit.api.errors.GitAPIException;
-import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.lib.RepositoryCache;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.util.FS;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -54,7 +50,6 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
@@ -64,7 +59,6 @@ import com.typesafe.config.ConfigSyntax;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
 import org.apache.gobblin.runtime.api.FlowSpec;
@@ -75,7 +69,6 @@ import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
@@ -95,7 +88,7 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 @Slf4j
 public class MultiHopFlowCompilerTest {
-  private FlowGraph flowGraph;
+  private AtomicReference<FlowGraph> flowGraph;
   private MultiHopFlowCompiler specCompiler;
   private final String TESTDIR = "/tmp/mhCompiler/gitFlowGraphTestDir";
 
@@ -103,7 +96,7 @@ public class MultiHopFlowCompilerTest {
   public void setUp()
       throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException {
     //Create a FlowGraph
-    this.flowGraph = new BaseFlowGraph();
+    this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
 
     //Add DataNodes to the graph from the node properties files
     URI dataNodesUri = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI();
@@ -119,7 +112,7 @@ public class MultiHopFlowCompilerTest {
         Class dataNodeClass = Class.forName(ConfigUtils
             .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
         DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
-        this.flowGraph.addDataNode(dataNode);
+        this.flowGraph.get().addDataNode(dataNode);
       }
     }
 
@@ -153,7 +146,7 @@ public class MultiHopFlowCompilerTest {
           specExecutors.add(topologySpecMap.get(new URI(specExecutorName)).getSpecExecutor());
         }
         FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog, specExecutors);
-        this.flowGraph.addFlowEdge(edge);
+        this.flowGraph.get().addFlowEdge(edge);
       }
     }
     this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph);
@@ -400,7 +393,7 @@ public class MultiHopFlowCompilerTest {
   @Test (dependsOnMethods = "testCompileFlowWithRetention")
   public void testCompileFlowAfterFirstEdgeDeletion() throws URISyntaxException, IOException {
     //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
-    this.flowGraph.deleteFlowEdge("HDFS-1_HDFS-1_hdfsConvertToJsonAndEncrypt");
+    this.flowGraph.get().deleteFlowEdge("HDFS-1_HDFS-1_hdfsConvertToJsonAndEncrypt");
 
     FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", false, false);
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
@@ -522,7 +515,7 @@ public class MultiHopFlowCompilerTest {
   @Test (dependsOnMethods = "testCompileFlowAfterFirstEdgeDeletion")
   public void testCompileFlowAfterSecondEdgeDeletion() throws URISyntaxException, IOException {
     //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
-    this.flowGraph.deleteFlowEdge("HDFS-2_HDFS-2_hdfsConvertToJsonAndEncrypt");
+    this.flowGraph.get().deleteFlowEdge("HDFS-2_HDFS-2_hdfsConvertToJsonAndEncrypt");
 
     FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", false, false);
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
@@ -674,67 +667,6 @@ public class MultiHopFlowCompilerTest {
     spec.getCompilationErrors().stream().anyMatch(s -> s.errorMessage.contains("Flowgraph does not have a node with id"));
   }
 
-  @Test (dependsOnMethods = "testMissingDestinationNodeError")
-  public void testGitFlowGraphMonitorService()
-      throws IOException, GitAPIException, URISyntaxException, InterruptedException {
-    File remoteDir = new File(TESTDIR + "/remote");
-    File cloneDir = new File(TESTDIR + "/clone");
-    File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
-
-    //Clean up
-    cleanUpDir(TESTDIR);
-
-    // Create a bare repository
-    RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED);
-    Repository remoteRepo = fileKey.open(false);
-    remoteRepo.create(true);
-
-    Git gitForPush = Git.cloneRepository().setURI(remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
-
-    // push an empty commit as a base for detecting changes
-    gitForPush.commit().setMessage("First commit").call();
-    RefSpec masterRefSpec = new RefSpec("master");
-    gitForPush.push().setRemote("origin").setRefSpecs(masterRefSpec).call();
-
-    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-
-    Config config = ConfigBuilder.create()
-        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
-            + ConfigurationKeys.GIT_MONITOR_REPO_URI, remoteRepo.getDirectory().getAbsolutePath())
-        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TESTDIR + "/git-flowgraph")
-        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
-        .addPrimitive(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString())
-        .build();
-
-    //Create a MultiHopFlowCompiler instance
-    specCompiler = new MultiHopFlowCompiler(config, Optional.absent(), false);
-
-    specCompiler.setActive(true);
-
-    //Ensure node1 is not present in the graph
-    Assert.assertNull(specCompiler.getFlowGraph().getNode("node1"));
-
-    // push a new node file
-    File nodeDir = new File(flowGraphDir, "node1");
-    File nodeFile = new File(nodeDir, "node1.properties");
-    nodeDir.mkdirs();
-    nodeFile.createNewFile();
-    Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=val1" + "\n", nodeFile, Charsets.UTF_8);
-
-    // add, commit, push node
-    gitForPush.add().addFilepattern(formNodeFilePath(flowGraphDir, nodeDir.getName(), nodeFile.getName())).call();
-    gitForPush.commit().setMessage("Node commit").call();
-    gitForPush.push().setRemote("origin").setRefSpecs(masterRefSpec).call();
-
-    // polling is every 5 seconds, so wait twice as long and check
-    TimeUnit.SECONDS.sleep(10);
-
-    //Test that a DataNode is added to FlowGraph
-    DataNode dataNode = specCompiler.getFlowGraph().getNode("node1");
-    Assert.assertEquals(dataNode.getId(), "node1");
-    Assert.assertEquals(dataNode.getRawConfig().getString("param1"), "val1");
-  }
-
   private String formNodeFilePath(File flowGraphDir, String groupDir, String fileName) {
     return flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
   }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
new file mode 100644
index 000000000..0c0799c23
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.transport.RefSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class FsFlowGraphMonitorTest {
+  private static final Logger logger = LoggerFactory.getLogger(FsFlowGraphMonitorTest.class);
+  private final File TEST_DIR = new File(FileUtils.getTempDirectory(), "fsFlowGraphTestDir");
+  private final File flowGraphDir = new File(TEST_DIR, "/gobblin-flowgraph");
+  private static final String NODE_1_FILE = "node1.properties";
+  private final File node1Dir = new File(FileUtils.getTempDirectory(), "node1");
+  private final File node1File = new File(node1Dir, NODE_1_FILE);
+  private static final String NODE_2_FILE = "node2.properties";
+  private final File node2Dir = new File(FileUtils.getTempDirectory(), "node2");
+  private final File node2File = new File(node2Dir, NODE_2_FILE);
+  private final File edge1Dir = new File(node1Dir, "node2");
+  private final File edge1File = new File(edge1Dir, "edge1.properties");
+
+  private RefSpec masterRefSpec = new RefSpec("master");
+  private Optional<FSFlowTemplateCatalog> flowCatalog;
+  private Config config;
+  private AtomicReference<FlowGraph> flowGraph;
+  private FsFlowGraphMonitor flowGraphMonitor;
+  private Map<URI, TopologySpec> topologySpecMap;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    cleanUpDir(TEST_DIR.toString());
+
+
+    URI topologyCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
+    this.topologySpecMap = MultiHopFlowCompilerTest.buildTopologySpecMap(topologyCatalogUri);
+
+    this.config = ConfigBuilder.create()
+        .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "."
+            + ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR, this.flowGraphDir.getAbsolutePath())
+        .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.FLOWGRAPH_BASE_DIR, "gobblin-flowgraph")
+        .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 1)
+        .build();
+
+    // Create a FSFlowTemplateCatalog instance
+    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    Properties properties = new Properties();
+    this.flowGraphDir.mkdirs();
+    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    this.flowCatalog = Optional.of(new FSFlowTemplateCatalog(templateCatalogCfg));
+
+    //Create a FlowGraph instance with defaults
+    this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
+    MultiHopFlowCompiler mhfc = new MultiHopFlowCompiler(config, this.flowGraph);
+    this.flowGraphMonitor = new FsFlowGraphMonitor(this.config, this.flowCatalog, mhfc, topologySpecMap, new CountDownLatch(1), true);
+    this.flowGraphMonitor.startUp();
+    this.flowGraphMonitor.setActive(true);
+  }
+
+  @Test
+  public void testAddNode() throws Exception {
+    String file1Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=value1\n";
+    String file2Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam2=value2\n";
+
+    addNode(this.node1Dir, this.node1File, file1Contents);
+    addNode(this.node2Dir, this.node2File, file2Contents);
+
+    // Let the monitor pick up the nodes that were recently added
+    Thread.sleep(3000);
+    for (int i = 0; i < 2; i++) {
+      String nodeId = "node" + (i + 1);
+      String paramKey = "param" + (i + 1);
+      String paramValue = "value" + (i + 1);
+      //Check if nodes have been added to the FlowGraph
+      DataNode dataNode = this.flowGraph.get().getNode(nodeId);
+      Assert.assertEquals(dataNode.getId(), nodeId);
+      Assert.assertTrue(dataNode.isActive());
+      Assert.assertEquals(dataNode.getRawConfig().getString(paramKey), paramValue);
+    }
+  }
+
+  @Test (dependsOnMethods = "testAddNode")
+  public void testAddEdge() throws Exception {
+    //Build contents of edge file
+    String fileContents = buildEdgeFileContents("node1", "node2", "edge1", "value1");
+    addEdge(this.edge1Dir, this.edge1File, fileContents);
+    // Let the monitor pick up the edges that were recently added
+    Thread.sleep(3000);
+
+    //Check if edge1 has been added to the FlowGraph
+    testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value1");
+  }
+
+  @Test (dependsOnMethods = "testAddEdge")
+  public void testUpdateEdge() throws Exception {
+    //Update edge1 file
+    String fileContents = buildEdgeFileContents("node1", "node2", "edge1", "value2");
+    addEdge(this.edge1Dir, this.edge1File, fileContents);
+    // Let the monitor pick up the edges that were recently added
+    Thread.sleep(3000);
+
+    //Check if new edge1 has been added to the FlowGraph
+    testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value2");
+  }
+
+  @Test (dependsOnMethods = "testUpdateEdge")
+  public void testUpdateNode() throws Exception {
+    //Update param1 value in node1 and check if updated node is added to the graph
+    String fileContents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=value3\n";
+    addNode(this.node1Dir, this.node1File, fileContents);
+    // Let the monitor pick up the edges that were recently added
+    Thread.sleep(3000);
+    //Check if node has been updated in the FlowGraph
+    DataNode dataNode = this.flowGraph.get().getNode("node1");
+    Assert.assertEquals(dataNode.getId(), "node1");
+    Assert.assertTrue(dataNode.isActive());
+    Assert.assertEquals(dataNode.getRawConfig().getString("param1"), "value3");
+  }
+
+  @Test (dependsOnMethods = "testUpdateNode")
+  public void testSetUpExistingGraph() throws Exception {
+    // Create a FlowGraph instance with defaults
+    this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
+    MultiHopFlowCompiler mhfc = new MultiHopFlowCompiler(config, this.flowGraph);
+
+    this.flowGraphMonitor = new FsFlowGraphMonitor(this.config, this.flowCatalog, mhfc, this.topologySpecMap, new CountDownLatch(1), true);
+    this.flowGraphMonitor.startUp();
+    this.flowGraphMonitor.setActive(true);
+
+    // Let the monitor repopulate the flowgraph
+    Thread.sleep(3000);
+
+    Assert.assertNotNull(this.flowGraph.get().getNode("node1"));
+    Assert.assertNotNull(this.flowGraph.get().getNode("node2"));
+    Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 1);
+
+  }
+
+  @Test (dependsOnMethods = "testSetUpExistingGraph")
+  public void testRemoveEdge() throws Exception {
+    //Node1 has 1 edge before delete
+    Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 1);
+    File edgeFile = new File(this.flowGraphDir.getAbsolutePath(), node1Dir.getName() + Path.SEPARATOR_CHAR + edge1Dir.getName() + Path.SEPARATOR_CHAR + edge1File.getName());
+
+    edgeFile.delete();
+    // Let the monitor pick up the edges that were recently deleted
+    Thread.sleep(3000);
+
+    //Check if edge1 has been deleted from the graph
+    edgeSet = this.flowGraph.get().getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 0);
+  }
+
+  @Test (dependsOnMethods = "testRemoveEdge")
+  public void testRemoveNode() throws Exception {
+    //Ensure node1 and node2 are present in the graph before delete
+    DataNode node1 = this.flowGraph.get().getNode("node1");
+    Assert.assertNotNull(node1);
+    DataNode node2 = this.flowGraph.get().getNode("node2");
+    Assert.assertNotNull(node2);
+
+
+    File node1FlowGraphFile = new File(this.flowGraphDir.getAbsolutePath(), node1Dir.getName());
+    File node2FlowGraphFile = new File(this.flowGraphDir.getAbsolutePath(), node2Dir.getName());
+    //delete node files
+    FileUtils.deleteDirectory(node1FlowGraphFile);
+    FileUtils.deleteDirectory(node2FlowGraphFile);
+
+    // Let the monitor pick up the edges that were recently deleted
+    Thread.sleep(3000);
+
+    //Check if node1 and node 2 have been deleted from the graph
+    node1 = this.flowGraph.get().getNode("node1");
+    Assert.assertNull(node1);
+    node2 = this.flowGraph.get().getNode("node2");
+    Assert.assertNull(node2);
+  }
+
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    cleanUpDir(TEST_DIR.toString());
+  }
+
+  private void createNewFile(File dir, File file, String fileContents) throws IOException {
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+    file.createNewFile();
+    Files.write(fileContents, file, Charsets.UTF_8);
+  }
+
+  private void addNode(File nodeDir, File nodeFile, String fileContents) throws IOException {
+    createNewFile(nodeDir, nodeFile, fileContents);
+    File destinationFile = new File(this.flowGraphDir.getAbsolutePath(), nodeDir.getName() + Path.SEPARATOR_CHAR + nodeFile.getName());
+    logger.info(destinationFile.toString());
+    if (destinationFile.exists()) {
+      // clear file
+      Files.write(new byte[0], destinationFile);
+      Files.write(fileContents, destinationFile, Charsets.UTF_8);
+    } else {
+      FileUtils.moveDirectory(nodeDir, destinationFile.getParentFile());
+    }
+  }
+
+  private void addEdge(File edgeDir, File edgeFile, String fileContents) throws Exception {
+    createNewFile(edgeDir, edgeFile, fileContents);
+    File destinationFile =  new File(this.flowGraphDir.getAbsolutePath(), edgeDir.getParentFile().getName() + Path.SEPARATOR_CHAR +  edgeDir.getName() + Path.SEPARATOR_CHAR + edgeFile.getName());
+    if (destinationFile.exists()) {
+      // clear old properties file
+      Files.write(new byte[0], destinationFile);
+      Files.write(fileContents, destinationFile, Charsets.UTF_8);
+    } else {
+      FileUtils.moveDirectory(edgeDir, destinationFile.getParentFile());
+    }
+  }
+
+  private String buildEdgeFileContents(String node1, String node2, String edgeName, String value) {
+    String fileContents = FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=" + node1 + "\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=" + node2 + "\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=" + edgeName + "\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "=testExecutor1,testExecutor2\n"
+        + "key1=" + value + "\n";
+    return fileContents;
+  }
+
+  private void testIfEdgeSuccessfullyAdded(String node1, String node2, String edgeName, String value) throws ExecutionException, InterruptedException {
+    Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges(node1);
+    Assert.assertEquals(edgeSet.size(), 1);
+    FlowEdge flowEdge = edgeSet.iterator().next();
+    Assert.assertEquals(flowEdge.getId(), Joiner.on("_").join(node1, node2, edgeName));
+    Assert.assertEquals(flowEdge.getSrc(), node1);
+    Assert.assertEquals(flowEdge.getDest(), node2);
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
+    Assert.assertEquals(flowEdge.getConfig().getString("key1"), value);
+  }
+
+  private void cleanUpDir(String dir) {
+    File specStoreDir = new File(dir);
+
+    // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
+    for (int i = 0; i < 5; i++) {
+      try {
+        if (specStoreDir.exists()) {
+          FileUtils.deleteDirectory(specStoreDir);
+        }
+        // if delete succeeded then break out of loop
+        break;
+      } catch (IOException e) {
+        logger.warn("Cleanup delete directory failed for directory: " + dir, e);
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
index efa34e1ec..e6cd3354b 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
@@ -21,14 +21,18 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
+
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.eclipse.jgit.api.Git;
 import org.eclipse.jgit.api.errors.GitAPIException;
 import org.eclipse.jgit.dircache.DirCache;
@@ -83,7 +87,7 @@ public class GitFlowGraphMonitorTest {
   private RefSpec masterRefSpec = new RefSpec("master");
   private Optional<FSFlowTemplateCatalog> flowCatalog;
   private Config config;
-  private BaseFlowGraph flowGraph;
+  private AtomicReference<FlowGraph> flowGraph;
   private GitFlowGraphMonitor gitFlowGraphMonitor;
 
   @BeforeClass
@@ -120,11 +124,10 @@ public class GitFlowGraphMonitorTest {
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
     this.flowCatalog = Optional.of(new FSFlowTemplateCatalog(templateCatalogCfg));
+    this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
+    MultiHopFlowCompiler mhfc = new MultiHopFlowCompiler(config, this.flowGraph);
 
-    //Create a FlowGraph instance with defaults
-    this.flowGraph = new BaseFlowGraph();
-
-    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap, new CountDownLatch(1));
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, mhfc, topologySpecMap, new CountDownLatch(1), true);
     this.gitFlowGraphMonitor.setActive(true);
   }
 
@@ -143,7 +146,7 @@ public class GitFlowGraphMonitorTest {
       String paramKey = "param" + (i + 1);
       String paramValue = "value" + (i + 1);
       //Check if nodes have been added to the FlowGraph
-      DataNode dataNode = this.flowGraph.getNode(nodeId);
+      DataNode dataNode = this.flowGraph.get().getNode(nodeId);
       Assert.assertEquals(dataNode.getId(), nodeId);
       Assert.assertTrue(dataNode.isActive());
       Assert.assertEquals(dataNode.getRawConfig().getString(paramKey), paramValue);
@@ -191,7 +194,7 @@ public class GitFlowGraphMonitorTest {
 
     this.gitFlowGraphMonitor.processGitConfigChanges();
     //Check if node has been updated in the FlowGraph
-    DataNode dataNode = this.flowGraph.getNode("node1");
+    DataNode dataNode = this.flowGraph.get().getNode("node1");
     Assert.assertEquals(dataNode.getId(), "node1");
     Assert.assertTrue(dataNode.isActive());
     Assert.assertEquals(dataNode.getRawConfig().getString("param1"), "value3");
@@ -204,7 +207,7 @@ public class GitFlowGraphMonitorTest {
     edge1File.delete();
 
     //Node1 has 1 edge before delete
-    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+    Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges("node1");
     Assert.assertEquals(edgeSet.size(), 1);
 
     // delete, commit, push
@@ -216,7 +219,7 @@ public class GitFlowGraphMonitorTest {
     this.gitFlowGraphMonitor.processGitConfigChanges();
 
     //Check if edge1 has been deleted from the graph
-    edgeSet = this.flowGraph.getEdges("node1");
+    edgeSet = this.flowGraph.get().getEdges("node1");
     Assert.assertTrue(edgeSet.size() == 0);
   }
 
@@ -227,9 +230,9 @@ public class GitFlowGraphMonitorTest {
     node2File.delete();
 
     //Ensure node1 and node2 are present in the graph before delete
-    DataNode node1 = this.flowGraph.getNode("node1");
+    DataNode node1 = this.flowGraph.get().getNode("node1");
     Assert.assertNotNull(node1);
-    DataNode node2 = this.flowGraph.getNode("node2");
+    DataNode node2 = this.flowGraph.get().getNode("node2");
     Assert.assertNotNull(node2);
 
     // delete, commit, push
@@ -241,9 +244,9 @@ public class GitFlowGraphMonitorTest {
     this.gitFlowGraphMonitor.processGitConfigChanges();
 
     //Check if node1 and node 2 have been deleted from the graph
-    node1 = this.flowGraph.getNode("node1");
+    node1 = this.flowGraph.get().getNode("node1");
     Assert.assertNull(node1);
-    node2 = this.flowGraph.getNode("node2");
+    node2 = this.flowGraph.get().getNode("node2");
     Assert.assertNull(node2);
   }
 
@@ -270,9 +273,9 @@ public class GitFlowGraphMonitorTest {
 
     this.gitFlowGraphMonitor.processGitConfigChanges();
     //Ensure node1 and node2 are present in the graph
-    DataNode node1 = this.flowGraph.getNode("node1");
+    DataNode node1 = this.flowGraph.get().getNode("node1");
     Assert.assertNotNull(node1);
-    DataNode node2 = this.flowGraph.getNode("node2");
+    DataNode node2 = this.flowGraph.get().getNode("node2");
     Assert.assertNotNull(node2);
     testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value1");
 
@@ -293,9 +296,9 @@ public class GitFlowGraphMonitorTest {
     this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
 
     this.gitFlowGraphMonitor.processGitConfigChanges();
-    node1 = this.flowGraph.getNode("node1");
+    node1 = this.flowGraph.get().getNode("node1");
     Assert.assertNotNull(node1);
-    Assert.assertEquals(this.flowGraph.getEdges(node1).size(), 0);
+    Assert.assertEquals(this.flowGraph.get().getEdges(node1).size(), 0);
   }
 
   @AfterClass
@@ -339,7 +342,7 @@ public class GitFlowGraphMonitorTest {
   }
 
   private void testIfEdgeSuccessfullyAdded(String node1, String node2, String edgeName, String value) throws ExecutionException, InterruptedException {
-    Set<FlowEdge> edgeSet = this.flowGraph.getEdges(node1);
+    Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges(node1);
     Assert.assertEquals(edgeSet.size(), 1);
     FlowEdge flowEdge = edgeSet.iterator().next();
     Assert.assertEquals(flowEdge.getId(), Joiner.on("_").join(node1, node2, edgeName));
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
index 994240690..e0d022342 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
@@ -107,7 +107,7 @@ public class CallbacksDispatcher<L> implements Closeable {
   public synchronized void addListener(L listener) {
     Preconditions.checkNotNull(listener);
 
-    _log.info("Adding listener:" + listener);
+    _log.debug("Adding listener:" + listener);
     _listeners.add(listener);
   }
 
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ExceptionCatchingPathAlterationListenerDecorator.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ExceptionCatchingPathAlterationListenerDecorator.java
index 4989275a6..00c8472d5 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ExceptionCatchingPathAlterationListenerDecorator.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ExceptionCatchingPathAlterationListenerDecorator.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.util.filesystem;
 
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+
 import org.apache.hadoop.fs.Path;
 
 import org.apache.gobblin.util.Decorator;
@@ -42,73 +45,83 @@ public class ExceptionCatchingPathAlterationListenerDecorator implements PathAlt
 
   @Override
   public void onStart(PathAlterationObserver observer) {
-    try {
+    logSwallowedThrowable(() -> {
       this.underlying.onStart(observer);
-    } catch (Throwable exc) {
-      log.error("onStart failure: ", exc);
-    }
+      return null;
+    });
   }
 
   @Override
   public void onFileCreate(Path path) {
-    try {
+    logSwallowedThrowable(() -> {
       this.underlying.onFileCreate(path);
-    } catch (Throwable exc) {
-      log.error("onFileCreate failure: ", exc);
-    }
+      return null;
+    });
   }
 
   @Override
   public void onFileChange(Path path) {
-    try {
+    logSwallowedThrowable(() -> {
       this.underlying.onFileChange(path);
-    } catch (Throwable exc) {
-      log.error("onFileChange failure: ", exc);
-    }
+      return null;
+    });
   }
 
   @Override
   public void onStop(PathAlterationObserver observer) {
-    try {
+    logSwallowedThrowable(() -> {
       this.underlying.onStop(observer);
-    } catch (Throwable exc) {
-      log.error("onStop failure: ", exc);
-    }
+      return null;
+    });
   }
 
   @Override
   public void onDirectoryCreate(Path directory) {
-    try {
+    logSwallowedThrowable(() -> {
       this.underlying.onDirectoryCreate(directory);
-    } catch (Throwable exc) {
-      log.error("onDirectoryCreate failure: ", exc);
-    }
+      return null;
+    });
   }
 
   @Override
   public void onDirectoryChange(Path directory) {
-    try {
+    logSwallowedThrowable(() -> {
       this.underlying.onDirectoryChange(directory);
-    } catch (Throwable exc) {
-      log.error("onDirectoryChange failure: ", exc);
-    }
+      return null;
+    });
   }
 
   @Override
   public void onDirectoryDelete(Path directory) {
-    try {
+    logSwallowedThrowable(() -> {
       this.underlying.onDirectoryDelete(directory);
-    } catch (Throwable exc) {
-      log.error("onDirectoryDelete failure: ", exc);
-    }
+      return null;
+    });
   }
 
   @Override
   public void onFileDelete(Path path) {
-    try {
+    logSwallowedThrowable(() -> {
       this.underlying.onFileDelete(path);
+      return null;
+    });
+  }
+
+  @Override
+  public void onCheckDetectedChange() {
+    logSwallowedThrowable(() -> {
+      this.underlying.onCheckDetectedChange();
+      return null;
+    });
+  }
+
+  protected void logSwallowedThrowable(Callable<Void> c) {
+    try {
+      c.call();
     } catch (Throwable exc) {
-      log.error("onFileDelete failure: ", exc);
+      String methodName = Arrays.stream(exc.getStackTrace()).findFirst().get().getMethodName();
+      log.error(methodName + " failed due to exception:", exc);
     }
   }
+
 }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListener.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListener.java
index fa4d94087..d271c9198 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListener.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListener.java
@@ -43,4 +43,10 @@ public interface PathAlterationListener {
   void onDirectoryDelete(final Path directory);
 
   void onFileDelete(final Path path);
+
+  /**
+   * Is invoked after any file or directory is modified, after processing the other change events
+   * Is only invoked once per poll, which is found in checkAndNotify() from {@link PathAlterationObserver}
+   */
+  void onCheckDetectedChange();
 }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListenerAdaptor.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListenerAdaptor.java
index cf5891a41..0a1ed6e6f 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListenerAdaptor.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListenerAdaptor.java
@@ -44,4 +44,7 @@ public class PathAlterationListenerAdaptor implements PathAlterationListener {
 
   public void onFileDelete(final Path path) {
   }
+
+  public void onCheckDetectedChange() {
+  }
 }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
index 6fcb44374..1d6dbd0e2 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
@@ -43,9 +43,9 @@ public class PathAlterationObserver {
   private final PathFilter pathFilter;
   private final Comparator<Path> comparator;
   private final FileSystem fs;
-
   private final Path[] EMPTY_PATH_ARRAY = new Path[0];
 
+  private boolean changeApplied = false;
   /**
    * Final processing.
    */
@@ -163,13 +163,15 @@ public class PathAlterationObserver {
   /**
    * Check whether the file and its children have been created, modified or deleted.
    */
-  public void checkAndNotify()
+  public synchronized void checkAndNotify()
       throws IOException {
+    // If any files or directories are modified this flag will be set to true
+    this.changeApplied = false;
+
     /* fire onStart() */
     for (final PathAlterationListener listener : listeners.values()) {
       listener.onStart(this);
     }
-
     /* fire directory/file events */
     final Path rootPath = rootEntry.getPath();
 
@@ -183,10 +185,18 @@ public class PathAlterationObserver {
       // Didn't exist and still doesn't
     }
 
+    if (this.changeApplied) {
+      for (final PathAlterationListener listener : listeners.values()) {
+        // Fire onCheckDetectedChange to notify when one check contains any number of changes
+        listener.onCheckDetectedChange();
+      }
+    }
+
     /* fire onStop() */
     for (final PathAlterationListener listener : listeners.values()) {
       listener.onStop(this);
     }
+
   }
 
   /**
@@ -196,7 +206,7 @@ public class PathAlterationObserver {
    * @param previous The original list of paths
    * @param currentPaths The current list of paths
    */
-  private void checkAndNotify(final FileStatusEntry parent, final FileStatusEntry[] previous, final Path[] currentPaths)
+  private synchronized void checkAndNotify(final FileStatusEntry parent, final FileStatusEntry[] previous, final Path[] currentPaths)
       throws IOException {
 
     int c = 0;
@@ -265,8 +275,8 @@ public class PathAlterationObserver {
    *
    * @param entry The file entry
    */
-  private void doCreate(final FileStatusEntry entry) {
-
+  protected synchronized void doCreate(final FileStatusEntry entry) {
+    this.changeApplied = true;
     for (final PathAlterationListener listener : listeners.values()) {
       if (entry.isDirectory()) {
         listener.onDirectoryCreate(entry.getPath());
@@ -286,9 +296,10 @@ public class PathAlterationObserver {
    * @param entry The previous file system entry
    * @param path The current file
    */
-  private void doMatch(final FileStatusEntry entry, final Path path)
+  private synchronized void doMatch(final FileStatusEntry entry, final Path path)
       throws IOException {
     if (entry.refresh(path)) {
+      this.changeApplied = true;
       for (final PathAlterationListener listener : listeners.values()) {
         if (entry.isDirectory()) {
           listener.onDirectoryChange(path);
@@ -304,8 +315,9 @@ public class PathAlterationObserver {
    *
    * @param entry The file entry
    */
-  private void doDelete(final FileStatusEntry entry) {
-    for (final PathAlterationListener listener : listeners.values()) {
+   private synchronized void doDelete(final FileStatusEntry entry) {
+     this.changeApplied = true;
+     for (final PathAlterationListener listener : listeners.values()) {
       if (entry.isDirectory()) {
         listener.onDirectoryDelete(entry.getPath());
       } else {