You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/20 23:01:15 UTC

[gobblin] branch master updated: [GOBBLIN-1724] Support a shared flowgraph layout in GaaS (#3583)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 635e81f8b [GOBBLIN-1724] Support a shared flowgraph layout in GaaS (#3583)
635e81f8b is described below

commit 635e81f8bcf10099cddf93d4e35e23c905fc840c
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Oct 20 16:01:09 2022 -0700

    [GOBBLIN-1724] Support a shared flowgraph layout in GaaS (#3583)
    
    * Add shared flowgraph format and helper
    
    * Use static variables for constants
    
    * Handle scenario where test dir can be at root
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |   1 +
 .../modules/flowgraph/BaseFlowGraphHelper.java     |  58 ++++++----
 .../modules/flowgraph/SharedFlowGraphHelper.java   | 126 +++++++++++++++++++++
 .../service/monitoring/FsFlowGraphMonitor.java     |  22 +++-
 .../service/monitoring/FsFlowGraphMonitorTest.java |  50 ++++++--
 5 files changed, 223 insertions(+), 34 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index d9c251e69..1aa6e30dc 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -52,6 +52,7 @@ public class ServiceConfigKeys {
   public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helixInstanceName";
   public static final String GOBBLIN_SERVICE_FLOWSPEC = GOBBLIN_SERVICE_PREFIX + "flowSpec";
   public static final String GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowGraph.class";
+  public static final String GOBBLIN_SERVICE_FLOWGRAPH_HELPER_KEY = GOBBLIN_SERVICE_PREFIX + "flowGraphHelper.class";
 
   // Helix message sub types for FlowSpec
   public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
index 4f0b82779..3e705a30b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
@@ -57,11 +57,21 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  * Provides the common set of functionalities needed by {@link FlowGraphMonitor} to read changes in files and
  * apply them to a {@link FlowGraph}
  * Assumes that the directory structure between flowgraphs configuration files are the same.
+ *
+ * Assumes that the flowgraph follows this format
+ *  /gobblin-flowgraph
+ *    /nodeA
+ *      /nodeB
+ *        edgeAB.properties
+ *      A.properties
+ *    /nodeB
+ *      B.properties
+ *
  */
 @Slf4j
 public class BaseFlowGraphHelper {
-  public static final int NODE_FILE_DEPTH = 3;
-  public static final int EDGE_FILE_DEPTH = 4;
+  private static final int NODE_FILE_DEPTH = 3;
+  private static final int EDGE_FILE_DEPTH = 4;
   private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
 
   final String baseDirectory;
@@ -69,12 +79,12 @@ public class BaseFlowGraphHelper {
 
   private final Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
   private final Map<URI, TopologySpec> topologySpecMap;
-  private MetricContext metricContext;
+  protected MetricContext metricContext;
   final String flowGraphFolderName;
   final PullFileLoader pullFileLoader;
   final Set<String> javaPropsExtensions;
   final Set<String> hoconFileExtensions;
-  private final Optional<ContextAwareMeter> flowGraphUpdateFailedMeter;
+  protected final Optional<ContextAwareMeter> flowGraphUpdateFailedMeter;
 
   public BaseFlowGraphHelper(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
       Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String flowGraphFolderName,
@@ -106,9 +116,9 @@ public class BaseFlowGraphHelper {
    * to instantiate a {@link DataNode} from the node config file.
    * @param path of node to add
    */
-  protected void addDataNode(FlowGraph graph, String path) {
-    if (checkFilePath(path, NODE_FILE_DEPTH)) {
-      Path nodeFilePath = new Path(this.baseDirectory, path);
+  protected void addDataNode(FlowGraph graph, java.nio.file.Path path) {
+    if (!java.nio.file.Files.isDirectory(path) && checkFilePath(path.toString(), getNodeFileDepth())) {
+      Path nodeFilePath = new Path(this.baseDirectory, path.toString());
       try {
         Config config = loadNodeFileWithOverrides(nodeFilePath);
         Class dataNodeClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.DATA_NODE_CLASS,
@@ -133,9 +143,9 @@ public class BaseFlowGraphHelper {
    * provided by the {@link FlowGraph} to build a {@link FlowEdge} from the edge config file.
    * @param path of edge to add
    */
-  protected void addFlowEdge(FlowGraph graph, String path) {
-    if (checkFilePath(path, EDGE_FILE_DEPTH)) {
-      Path edgeFilePath = new Path(this.baseDirectory, path);
+  protected void addFlowEdge(FlowGraph graph, java.nio.file.Path path) {
+    if (!java.nio.file.Files.isDirectory(path) && checkFilePath(path.toString(), getEdgeFileDepth())) {
+      Path edgeFilePath = new Path(this.baseDirectory, path.toString());
       try {
         Config edgeConfig = loadEdgeFileWithOverrides(edgeFilePath);
         List<SpecExecutor> specExecutors = getSpecExecutors(edgeConfig);
@@ -168,7 +178,7 @@ public class BaseFlowGraphHelper {
    * @param file the relative path from the root of the flowgraph
    * @return false if the file does not conform
    */
-  private boolean checkFilePath(String file, int depth) {
+  protected boolean checkFilePath(String file, int depth) {
     // The file is either a node file or an edge file and needs to be stored at either:
     // flowGraphDir/nodeName/nodeName.properties (if it is a node file), or
     // flowGraphDir/nodeName/nodeName/edgeName.properties (if it is an edge file)
@@ -197,7 +207,7 @@ public class BaseFlowGraphHelper {
     for (int i = 0; i < depth - 1; i++) {
       path = path.getParent();
     }
-    return path.getName().equals(flowGraphFolderName);
+    return path != null ? path.getName().equals(flowGraphFolderName) : false;
   }
 
   /**
@@ -285,20 +295,18 @@ public class BaseFlowGraphHelper {
     FlowGraph newFlowGraph = new BaseFlowGraph();
     java.nio.file.Path graphPath = new File(this.baseDirectory).toPath();
     try {
-      List<Path> edges = new ArrayList<>();
+      List<java.nio.file.Path> edges = new ArrayList<>();
       // All nodes must be added first before edges, otherwise edges may have a missing source or destination.
       // Need to convert files to Hadoop Paths to be compatible with FileAlterationListener
       java.nio.file.Files.walk(graphPath).forEach(fileName -> {
-        if (!java.nio.file.Files.isDirectory(fileName)) {
-          if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), NODE_FILE_DEPTH)) {
-            addDataNode(newFlowGraph, fileName.toString());
-          } else if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), EDGE_FILE_DEPTH)) {
-            edges.add(new Path(fileName.toString()));
-          }
+        if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), getNodeFileDepth())) {
+          addDataNode(newFlowGraph, fileName);
+        } else if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), getEdgeFileDepth())) {
+          edges.add(fileName);
         }
       });
-      for (Path edge : edges) {
-        addFlowEdge(newFlowGraph, edge.toString());
+      for (java.nio.file.Path edge : edges) {
+        addFlowEdge(newFlowGraph, edge);
       }
       return newFlowGraph;
     } catch (IOException e) {
@@ -321,4 +329,12 @@ public class BaseFlowGraphHelper {
   public String getEdgeId(String source, String destination, String edgeName) {
     return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(source, destination, edgeName);
   }
+
+  protected int getNodeFileDepth() {
+    return NODE_FILE_DEPTH;
+  }
+
+  protected int getEdgeFileDepth() {
+    return EDGE_FILE_DEPTH;
+  }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java
new file mode 100644
index 000000000..ce0c98b3d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * Supports a configuration of a flowgraph where it can support multiple sub-flowgraphs within its directory
+ * Node definitions are shared between each subgraph, but can be overwritten within the subgraph
+ * Edge definitions are only defined in the subgraphs
+ * e.g.
+ * /gobblin-flowgraph-absolute-dir
+ *   /subgraphA
+ *     /nodeA (NODE_FOLDER_DEPTH)
+ *       /nodeB
+ *         edgeAB.properties
+ *   /subgraphB
+ *     /nodeA
+ *       /nodeB
+ *         edgeAB.properties (EDGE_FILE_DEPTH)
+ *       A.properties (NODE_FILE_DEPTH)
+ *  /nodes
+ *    A.properties
+ *    B.properties
+ */
+@Slf4j
+public class SharedFlowGraphHelper extends BaseFlowGraphHelper {
+
+  protected String sharedNodeFolder;
+  private static String NODE_FILE_SUFFIX = ".properties";
+  private static String SHARED_NODE_FOLDER_NAME = "nodes";
+  private static int NODE_FOLDER_DEPTH = 2;
+
+  public SharedFlowGraphHelper(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
+      Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String flowGraphFolderName,
+      String javaPropsExtentions, String hoconFileExtensions, boolean instrumentationEnabled, Config config) {
+    super(flowTemplateCatalog, topologySpecMap, baseDirectory, flowGraphFolderName, javaPropsExtentions, hoconFileExtensions, instrumentationEnabled, config);
+    this.sharedNodeFolder = baseDirectory + File.separator + SHARED_NODE_FOLDER_NAME;
+  }
+
+  /**
+   * Looks into the sharedNodeFolder to use those configurations as fallbacks for the node to add
+   * Otherwise if the shared node does not exist, attempt to add the node in the same manner as {@link BaseFlowGraphHelper}
+   * @param graph
+   * @param path of node folder in the subgraph, so path is expected to be a directory
+   */
+  @Override
+  protected void addDataNode(FlowGraph graph, java.nio.file.Path path) {
+    try {
+      // Load node from shared folder first if it exists
+      Config sharedNodeConfig = ConfigFactory.empty();
+      String nodePropertyFile = path.getFileName().toString() + NODE_FILE_SUFFIX;
+      File sharedNodeFile = new File(this.sharedNodeFolder, nodePropertyFile);
+      if (sharedNodeFile.exists()) {
+        sharedNodeConfig = loadNodeFileWithOverrides(new Path(sharedNodeFile.getPath()));
+      }
+      File nodeFilePath = new File(path.toString(), nodePropertyFile);
+      Config nodeConfig = sharedNodeConfig;
+      if (nodeFilePath.exists()) {
+        nodeConfig = loadNodeFileWithOverrides(new Path(nodeFilePath.getPath())).withFallback(sharedNodeConfig);
+      }
+      if (nodeConfig.isEmpty()) {
+        throw new IOException(String.format("Cannot find expected property file %s in %s or %s", nodePropertyFile, sharedNodeFolder, path));
+      }
+      Class dataNodeClass = Class.forName(ConfigUtils.getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS,
+          FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+      DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
+      if (!graph.addDataNode(dataNode)) {
+        log.warn("Could not add DataNode {} to FlowGraph; skipping", dataNode.getId());
+      } else {
+        log.info("Added Datanode {} to FlowGraph", dataNode.getId());
+      }
+    } catch (Exception e) {
+      if (this.flowGraphUpdateFailedMeter.isPresent()) {
+        this.flowGraphUpdateFailedMeter.get().mark();
+      }
+      log.warn("Could not add DataNode {} defined in {} due to exception {}", path, e);
+    }
+  }
+
+  @Override
+  protected Config getNodeConfigWithOverrides(Config nodeConfig, Path nodeFilePath) {
+    String nodeId = FilenameUtils.removeExtension(nodeFilePath.getName().toString());
+    return nodeConfig.withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef(nodeId));
+  }
+
+  @Override
+  protected int getNodeFileDepth() {
+    return NODE_FOLDER_DEPTH;
+  }
+}
+
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
index 6183e7d66..bda777ae3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
@@ -18,11 +18,13 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Optional;
@@ -35,11 +37,14 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
 import org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.filesystem.PathAlterationObserver;
 import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
 
@@ -74,13 +79,18 @@ public class FsFlowGraphMonitor extends AbstractIdleService implements FlowGraph
         TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
     this.flowGraphPath = new Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR));
     this.observer = new PathAlterationObserver(flowGraphPath);
-    this.flowGraphHelper = new BaseFlowGraphHelper(flowTemplateCatalog, topologySpecMap, flowGraphPath.toString(),
-        configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS),
-        configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS), instrumentationEnabled, config);
+    try {
+      String helperClassName = ConfigUtils.getString(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_HELPER_KEY,
+          BaseFlowGraphHelper.class.getCanonicalName());
+      this.flowGraphHelper = (BaseFlowGraphHelper) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(BaseFlowGraphHelper.class).resolve(helperClassName)), flowTemplateCatalog, topologySpecMap, flowGraphPath.toString(),
+          configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS),
+          configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS), instrumentationEnabled, config);
+    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
     this.listener = new FSPathAlterationFlowGraphListener(flowTemplateCatalog, compiler, flowGraphPath.toString(), this.flowGraphHelper);
-
-   this.compiler = compiler;
-   this.initComplete = initComplete;
+    this.compiler = compiler;
+    this.initComplete = initComplete;
 
     if (pollingInterval == ConfigurationKeys.DISABLED_JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL) {
       this.pathAlterationDetector = null;
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
index 0c0799c23..1a4a36eff 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
@@ -23,6 +23,8 @@ import com.google.common.base.Optional;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -68,6 +70,7 @@ public class FsFlowGraphMonitorTest {
   private final File node2File = new File(node2Dir, NODE_2_FILE);
   private final File edge1Dir = new File(node1Dir, "node2");
   private final File edge1File = new File(edge1Dir, "edge1.properties");
+  private final File sharedNodeFolder = new File(TEST_DIR, "nodes");
 
   private RefSpec masterRefSpec = new RefSpec("master");
   private Optional<FSFlowTemplateCatalog> flowCatalog;
@@ -79,14 +82,14 @@ public class FsFlowGraphMonitorTest {
   @BeforeClass
   public void setUp() throws Exception {
     cleanUpDir(TEST_DIR.toString());
-
+    TEST_DIR.mkdirs();
 
     URI topologyCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
     this.topologySpecMap = MultiHopFlowCompilerTest.buildTopologySpecMap(topologyCatalogUri);
 
     this.config = ConfigBuilder.create()
         .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "."
-            + ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR, this.flowGraphDir.getAbsolutePath())
+            + ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR, TEST_DIR.getAbsolutePath())
         .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.FLOWGRAPH_BASE_DIR, "gobblin-flowgraph")
         .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 1)
         .build();
@@ -173,6 +176,7 @@ public class FsFlowGraphMonitorTest {
   @Test (dependsOnMethods = "testUpdateNode")
   public void testSetUpExistingGraph() throws Exception {
     // Create a FlowGraph instance with defaults
+    this.flowGraphMonitor.shutDown();
     this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
     MultiHopFlowCompiler mhfc = new MultiHopFlowCompiler(config, this.flowGraph);
 
@@ -186,10 +190,43 @@ public class FsFlowGraphMonitorTest {
     Assert.assertNotNull(this.flowGraph.get().getNode("node1"));
     Assert.assertNotNull(this.flowGraph.get().getNode("node2"));
     Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 1);
-
   }
 
   @Test (dependsOnMethods = "testSetUpExistingGraph")
+  public void testSharedFlowgraphHelper() throws Exception {
+    this.flowGraphMonitor.shutDown();
+    Config sharedFlowgraphConfig = ConfigFactory.empty()
+        .withValue(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_HELPER_KEY, ConfigValueFactory.fromAnyRef("org.apache.gobblin.service.modules.flowgraph.SharedFlowGraphHelper"))
+        .withFallback(this.config);
+
+
+    this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
+    MultiHopFlowCompiler mhfc = new MultiHopFlowCompiler(config, this.flowGraph);
+    // Set up node 3
+    File node3Folder = new File(this.flowGraphDir, "node3");
+    node3Folder.mkdirs();
+    File node3File = new File(this.sharedNodeFolder, "node3.properties");
+    String file3Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam3=value3\n";
+
+    // Have different default values for node 1
+    File node1File = new File(this.sharedNodeFolder, "node1.properties");
+    String file1Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam2=value10\n";
+
+    createNewFile(this.sharedNodeFolder, node3File, file3Contents);
+    createNewFile(this.sharedNodeFolder, node1File, file1Contents);
+
+    this.flowGraphMonitor = new FsFlowGraphMonitor(sharedFlowgraphConfig, this.flowCatalog, mhfc, this.topologySpecMap, new CountDownLatch(1), true);
+    this.flowGraphMonitor.startUp();
+    this.flowGraphMonitor.setActive(true);
+    // Let the monitor repopulate the flowgraph
+    Thread.sleep(3000);
+    Assert.assertNotNull(this.flowGraph.get().getNode("node3"));
+    DataNode node1 = this.flowGraph.get().getNode("node1");
+    Assert.assertTrue(node1.isActive());
+    Assert.assertEquals(node1.getRawConfig().getString("param2"), "value10");
+  }
+
+  @Test (dependsOnMethods = "testSharedFlowgraphHelper")
   public void testRemoveEdge() throws Exception {
     //Node1 has 1 edge before delete
     Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges("node1");
@@ -219,7 +256,6 @@ public class FsFlowGraphMonitorTest {
     //delete node files
     FileUtils.deleteDirectory(node1FlowGraphFile);
     FileUtils.deleteDirectory(node2FlowGraphFile);
-
     // Let the monitor pick up the edges that were recently deleted
     Thread.sleep(3000);
 
@@ -297,13 +333,13 @@ public class FsFlowGraphMonitorTest {
   }
 
   private void cleanUpDir(String dir) {
-    File specStoreDir = new File(dir);
+    File dirToDelete = new File(dir);
 
     // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
     for (int i = 0; i < 5; i++) {
       try {
-        if (specStoreDir.exists()) {
-          FileUtils.deleteDirectory(specStoreDir);
+        if (dirToDelete.exists()) {
+          FileUtils.deleteDirectory(dirToDelete);
         }
         // if delete succeeded then break out of loop
         break;