You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/24 21:50:05 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-748] Craftsmanship code cleaning in Gobblin Service Code

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9661916  [GOBBLIN-748] Craftsmanship code cleaning in Gobblin Service Code
9661916 is described below

commit 9661916684d466edf6ae499800a252c108bb9aa2
Author: autumnust <le...@linkedin.com>
AuthorDate: Wed Apr 24 14:49:54 2019 -0700

    [GOBBLIN-748] Craftsmanship code cleaning in Gobblin Service Code
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    Essentially a bunch of renaming, moving
    configurations key around.
    Things that we should agree on:
    - Configuration keys should always placed in the
    class where they are relevant, instead of
    `ConfigurationKeys.java` or similar things. We
    should definitely stop doing that.
    - Naming of `FSFlowCatalog` is misleading. In fact
    it should be `FsTemplateFlowCatalog`. Also, it
    should not extend `FsJobCatalog`. The missing
    piece is a `FsCatalogBase`  that implement all
    FileSystem related basic functionality, and have
    `Template` and `Job` FsCatalog extending that base
    implementation with it own interface.
    
    ### JIRA
    - [x ] My PR addresses the following [Gobblin
    JIRA]
        -
    https://issues.apache.org/jira/browse/GOBBLIN-748
    
    ### Description
    - [ x] Here are some details about my PR,
    including screenshots (if applicable):
    
    ### Tests
    - [ ] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    
    ### Commits
    - [ ] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    Some naive code changes made while reading through
    GaaS code: There are some work needs to be done
    for understand master/slave interaction
    
    Adding some documentation and renaming stuff
    
    Address comments
    
    Resolving conflicts
    
    Closes #2613 from
    autumnust/craftsmanshipCleaningGaaS
---
 .../gobblin/configuration/ConfigurationKeys.java   | 11 -----
 .../org/apache/gobblin/service/FlowConfigTest.java |  3 +-
 .../apache/gobblin/service/FlowConfigV2Test.java   |  3 +-
 .../apache/gobblin/runtime/api/JobTemplate.java    |  5 +++
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 46 +++++++++++++--------
 .../runtime/spec_catalog/TopologyCatalog.java      |  2 +-
 .../gobblin/runtime/spec_store/FSSpecStore.java    | 10 +++--
 .../runtime/spec_store/FSSpecStoreTest.java        |  7 ++--
 .../service/modules/core/GitConfigMonitor.java     |  2 +-
 .../service/modules/core/GitFlowGraphMonitor.java  |  6 +--
 .../service/modules/flow/MultiHopFlowCompiler.java |  6 +--
 .../service/modules/flowgraph/BaseFlowEdge.java    | 20 +++------
 .../gobblin/service/modules/flowgraph/Dag.java     |  3 ++
 .../service/modules/flowgraph/FlowEdgeFactory.java |  7 ++--
 .../service/modules/orchestration/DagManager.java  | 16 +++++---
 .../modules/orchestration/DagStateStore.java       | 12 +++---
 .../modules/orchestration/FSDagStateStore.java     |  2 +-
 .../service/modules/spec/JobExecutionPlan.java     |  4 +-
 ...FlowCatalog.java => FSFlowTemplateCatalog.java} | 47 +++++++++++++++-------
 .../service/modules/core/GitConfigMonitorTest.java |  2 +-
 .../modules/core/GitFlowGraphMonitorTest.java      |  8 ++--
 .../service/modules/core/GobblinServiceHATest.java |  5 ++-
 .../modules/core/GobblinServiceManagerTest.java    |  3 +-
 .../modules/flow/MultiHopFlowCompilerTest.java     |  6 +--
 .../modules/flowgraph/BaseFlowEdgeFactoryTest.java |  4 +-
 .../scheduler/GobblinServiceJobSchedulerTest.java  |  4 +-
 .../spec/JobExecutionPlanDagFactoryTest.java       |  6 +--
 ...logTest.java => FSFlowTemplateCatalogTest.java} |  6 +--
 28 files changed, 146 insertions(+), 110 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 8cdddb8..a5b56bb 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -883,23 +883,12 @@ public class ConfigurationKeys {
   public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
 
   /***
-   * Configuration properties related to Spec Store
-   */
-  public static final String SPECSTORE_FS_DIR_KEY = "specStore.fs.dir";
-
-  /***
    * Configuration properties related to TopologySpec Store
    */
   public static final String TOPOLOGYSPEC_STORE_CLASS_KEY = "topologySpec.store.class";
   public static final String TOPOLOGYSPEC_STORE_DIR_KEY = "topologySpec.store.dir";
 
   /***
-   * Configuration properties related to FlowSpec Store
-   */
-  public static final String FLOWSPEC_STORE_CLASS_KEY = "flowSpec.store.class";
-  public static final String FLOWSPEC_STORE_DIR_KEY = "flowSpec.store.dir";
-
-  /***
    * Configuration properties related to Spec Executor Instance
    */
   public static final String SPECEXECUTOR_INSTANCE_URI_KEY = "specExecInstance.uri";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 1e549e8..ecd884e 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -23,6 +23,7 @@ import java.net.ServerSocket;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -70,7 +71,7 @@ public class FlowConfigTest {
 
     configBuilder
         .addPrimitive(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, _testDirectory.getAbsolutePath())
-        .addPrimitive(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR);
+        .addPrimitive(FSSpecStore.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR);
     cleanUpDir(TEST_SPEC_STORE_DIR);
 
     Config config = configBuilder.build();
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 9b43875..33e8e58 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -65,7 +66,7 @@ public class FlowConfigV2Test {
 
     configBuilder
         .addPrimitive(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, _testDirectory.getAbsolutePath())
-        .addPrimitive(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR);
+        .addPrimitive(FSSpecStore.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR);
     cleanUpDir(TEST_SPEC_STORE_DIR);
 
     Config config = configBuilder.build();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
index f8f3071..1be22fc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
@@ -58,6 +58,11 @@ public interface JobTemplate extends Spec {
    */
   Config getResolvedConfig(Config userProps) throws SpecNotFoundException, TemplateException;
 
+  /**
+   * The Exception thrown while occurring error when loading/resolving a template.
+   * Note that the template here is not necessary to be a JobTemplate, it could be a FlowTemplate in the
+   * context of Gobblin Service.
+   */
   class TemplateException extends Exception {
     public TemplateException(String message, Throwable cause) {
       super(message, cause);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 71e0f9f..9b4c00c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,6 +17,10 @@
 
 package org.apache.gobblin.runtime.spec_catalog;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -27,22 +31,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
+import javax.annotation.Nonnull;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
-
-import javax.annotation.Nonnull;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
@@ -61,8 +56,17 @@ import org.apache.gobblin.util.callbacks.CallbackResult;
 import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
 
 
-@Alpha
+/**
+ * A service that interact with FlowSpec storage.
+ * The FlowSpec storage, a.k.a. {@link SpecStore} should be plugable with different implementation.
+ */
 public class FlowCatalog extends AbstractIdleService implements SpecCatalog, MutableSpecCatalog, SpecSerDe {
+
+  /***
+   * Configuration properties related to FlowSpec Store
+   */
+  public static final String FLOWSPEC_STORE_CLASS_KEY = "flowSpec.store.class";
+  public static final String FLOWSPEC_STORE_DIR_KEY = "flowSpec.store.dir";
   public static final String DEFAULT_FLOWSPEC_STORE_CLASS = FSSpecStore.class.getCanonicalName();
 
   protected final SpecCatalogListenersList listeners;
@@ -104,15 +108,15 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     this.aliasResolver = new ClassAliasResolver<>(SpecStore.class);
     try {
       Config newConfig = config;
-      if (config.hasPath(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY)) {
-        newConfig = config.withValue(ConfigurationKeys.SPECSTORE_FS_DIR_KEY,
-            config.getValue(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY));
+      if (config.hasPath(FLOWSPEC_STORE_DIR_KEY)) {
+        newConfig = config.withValue(FSSpecStore.SPECSTORE_FS_DIR_KEY,
+            config.getValue(FLOWSPEC_STORE_DIR_KEY));
       }
       String specStoreClassName = DEFAULT_FLOWSPEC_STORE_CLASS;
-      if (config.hasPath(ConfigurationKeys.FLOWSPEC_STORE_CLASS_KEY)) {
-        specStoreClassName = config.getString(ConfigurationKeys.FLOWSPEC_STORE_CLASS_KEY);
+      if (config.hasPath(FLOWSPEC_STORE_CLASS_KEY)) {
+        specStoreClassName = config.getString(FLOWSPEC_STORE_CLASS_KEY);
       }
-      this.log.info("Using audit sink class name/alias " + specStoreClassName);
+      this.log.info(String.format("Using class name/alias [%s] for specstore", specStoreClassName));
       this.specStore = (SpecStore) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(
           specStoreClassName)), newConfig, this);
     } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException
@@ -252,6 +256,14 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     }
   }
 
+  /**
+   * Persist {@link Spec} into {@link SpecStore} and notify {@link SpecCatalogListener} if triggerListener
+   * is set to true.
+   *
+   * @param spec The Spec to be added
+   * @param triggerListener True if listeners should be notified.
+   * @return
+   */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
     try {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index c44e111..043dd04 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -111,7 +111,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
     try {
       Config newConfig = config;
       if (config.hasPath(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY)) {
-        newConfig = config.withValue(ConfigurationKeys.SPECSTORE_FS_DIR_KEY,
+        newConfig = config.withValue(FSSpecStore.SPECSTORE_FS_DIR_KEY,
             config.getValue(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY));
       }
       String specStoreClassName = DEFAULT_TOPOLOGYSPEC_STORE_CLASS;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index e93464d..344901b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Iterator;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
 import org.apache.gobblin.runtime.api.Spec;
@@ -56,6 +55,11 @@ import org.slf4j.LoggerFactory;
  */
 public class FSSpecStore implements SpecStore {
 
+  /***
+   * Configuration properties related to Spec Store
+   */
+  public static final String SPECSTORE_FS_DIR_KEY = "specStore.fs.dir";
+
   protected final Logger log;
   protected final Config sysConfig;
   protected final FileSystem fs;
@@ -79,13 +83,13 @@ public class FSSpecStore implements SpecStore {
 
   public FSSpecStore(Config sysConfig, SpecSerDe specSerDe, Optional<Logger> log)
       throws IOException {
-    Preconditions.checkArgument(sysConfig.hasPath(ConfigurationKeys.SPECSTORE_FS_DIR_KEY),
+    Preconditions.checkArgument(sysConfig.hasPath(SPECSTORE_FS_DIR_KEY),
         "FS SpecStore path must be specified.");
 
     this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.sysConfig = sysConfig;
     this.specSerDe = specSerDe;
-    this.fsSpecStoreDir = this.sysConfig.getString(ConfigurationKeys.SPECSTORE_FS_DIR_KEY);
+    this.fsSpecStoreDir = this.sysConfig.getString(SPECSTORE_FS_DIR_KEY);
     this.fsSpecStoreDirPath = new Path(this.fsSpecStoreDir);
     this.log.info("FSSpecStore directory is: " + this.fsSpecStoreDir);
     try {
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
index e0e6ebb..a952f5d 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
@@ -29,7 +29,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.util.ConfigUtils;
@@ -50,7 +49,7 @@ public class FSSpecStoreTest {
   public void testPathConversion() throws Exception {
     Properties properties = new Properties();
     File tmpDir = Files.createTempDir();
-    properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, tmpDir.getAbsolutePath());
+    properties.setProperty(FSSpecStore.SPECSTORE_FS_DIR_KEY, tmpDir.getAbsolutePath());
     SpecSerDe specSerDe = Mockito.mock(SpecSerDe.class);
     FSSpecStore fsSpecStore = new FSSpecStore(ConfigUtils.propertiesToConfig(properties), specSerDe);
 
@@ -68,7 +67,7 @@ public class FSSpecStoreTest {
 
     File specDir = Files.createTempDir();
     Properties properties = new Properties();
-    properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
+    properties.setProperty(FSSpecStore.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
     SpecSerDe serde = Mockito.mock(SpecSerDe.class);
     TestFsSpecStore fsSpecStore = new TestFsSpecStore(ConfigUtils.propertiesToConfig(properties), serde);
 
@@ -122,7 +121,7 @@ public class FSSpecStoreTest {
   public void testGetSpecURI() throws Exception {
     File specDir = Files.createTempDir();
     Properties properties = new Properties();
-    properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
+    properties.setProperty(FSSpecStore.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
     SpecSerDe serde = Mockito.mock(SpecSerDe.class);
     FSSpecStore fsSpecStore = new FSSpecStore(ConfigUtils.propertiesToConfig(properties), serde);
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 4fe03da..7bbd4f6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -38,7 +38,7 @@ import org.apache.gobblin.util.PullFileLoader;
 
 /**
  * Service that monitors for jobs from a git repository.
- * The git repository must have an inital commit that has no config files since that is used as a base for getting
+ * The git repository must have an initial commit that has no config files since that is used as a base for getting
  * the change list.
  * The config needs to be organized with the following structure:
  * <root_config_dir>/<flowGroup>/<flowName>.(pull|job|json|conf)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 978875f..024c9a1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -49,7 +49,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -88,13 +88,13 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
           .put(SHOULD_CHECKPOINT_HASHES, false)
           .build());
 
-  private Optional<FSFlowCatalog> flowCatalog;
+  private Optional<FSFlowTemplateCatalog> flowCatalog;
   private FlowGraph flowGraph;
   private final Map<URI, TopologySpec> topologySpecMap;
   private final Config emptyConfig = ConfigFactory.empty();
   private final CountDownLatch initComplete;
 
-  public GitFlowGraphMonitor(Config config, Optional<FSFlowCatalog> flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
+  public GitFlowGraphMonitor(Config config, Optional<FSFlowTemplateCatalog> flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
     super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
     this.flowCatalog = flowCatalog;
     this.flowGraph = graph;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 756dfaa..347235a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.slf4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -53,7 +54,6 @@ import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -88,11 +88,11 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
   public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
     super(config, log, instrumentationEnabled);
     this.flowGraph = new BaseFlowGraph();
-    Optional<FSFlowCatalog> flowCatalog = Optional.absent();
+    Optional<FSFlowTemplateCatalog> flowCatalog = Optional.absent();
     if (config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
         && StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) {
       try {
-        flowCatalog = Optional.of(new FSFlowCatalog(config));
+        flowCatalog = Optional.of(new FSFlowTemplateCatalog(config));
       } catch (IOException e) {
         throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
       }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index 250fd57..0c1023e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -17,33 +17,22 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
-import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 
 import joptsimple.internal.Strings;
 import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
-import org.apache.gobblin.util.PullFileLoader;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -133,11 +122,12 @@ public class BaseFlowEdge implements FlowEdge {
      * A method to return an instance of {@link BaseFlowEdge}. The method performs all the validation checks
      * and returns
      * @param edgeProps Properties of edge
-     * @param flowCatalog Flow Catalog used to retrieve {@link FlowTemplate}s.
+     * @param flowTemplateCatalog Flow Catalog used to retrieve {@link FlowTemplate}s.
      * @return a {@link BaseFlowEdge}
      */
     @Override
-    public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException {
+    public FlowEdge createFlowEdge(Config edgeProps, FSFlowTemplateCatalog flowTemplateCatalog, List<SpecExecutor> specExecutors)
+        throws FlowEdgeCreationException {
       try {
         String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, "");
         Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
@@ -157,7 +147,7 @@ public class BaseFlowEdge implements FlowEdge {
             .checkArgument(!Strings.isNullOrEmpty(flowTemplateDirUri), "FlowTemplate URI must be not null or empty");
         boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
 
-        FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateDirUri));
+        FlowTemplate flowTemplate = flowTemplateCatalog.getFlowTemplate(new URI(flowTemplateDirUri));
         return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive);
       } catch (RuntimeException e) {
         throw e;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index c38460e..24fbb9a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -238,6 +238,9 @@ public class Dag<T> {
     return this;
   }
 
+  /**
+   * DagNode is essentially a job within a Dag, usually they are used interchangeably.
+   */
   @Getter
   public static class DagNode<T> {
     private T value;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
index 3744bf0..eb1d026 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
@@ -22,18 +22,19 @@ import java.util.List;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+
 
 public interface FlowEdgeFactory {
   /**
    * Construct a {@link FlowEdge} from the edge properties
    * @param edgeProps properties of the {@link FlowEdge}
-   * @param flowCatalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s
+   * @param flowCatalog an instance of {@link FSFlowTemplateCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s
    *               useful for creating a {@link FlowEdge}.
    * @return an instance of {@link FlowEdge}
    * @throws FlowEdgeCreationException
    */
-  public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException;
+  public FlowEdge createFlowEdge(Config edgeProps, FSFlowTemplateCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException;
 
   public class FlowEdgeCreationException extends Exception {
     private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index a9c1bbf..2e2360b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -313,7 +313,7 @@ public class DagManager extends AbstractIdleService {
         }
         log.debug("Polling job statuses..");
         //Poll and update the job statuses of running jobs.
-        pollJobStatuses();
+        pollAndAdvanceDag();
         log.debug("Poll done.");
         //Clean up any finished dags
         log.debug("Cleaning up finished dags..");
@@ -357,10 +357,9 @@ public class DagManager extends AbstractIdleService {
     }
 
     /**
-     * Poll the statuses of running jobs.
-     * @return List of {@link JobStatus}es.
+     * Proceed the execution of each dag node based on job status.
      */
-    private void pollJobStatuses()
+    private void pollAndAdvanceDag()
         throws IOException {
       this.failedDagIdsFinishRunning.clear();
 
@@ -428,7 +427,13 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException {
+    /**
+     * Obtain next dag
+     * @param dagId The dagId that has been processed.
+     * @return
+     * @throws IOException
+     */
+    synchronized Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException {
       Dag<JobExecutionPlan> dag = this.dags.get(dagId);
       Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
       //Submit jobs from the dag ready for execution.
@@ -437,6 +442,7 @@ public class DagManager extends AbstractIdleService {
       }
       //Checkpoint the dag state
       this.dagStateStore.writeCheckpoint(dag);
+
       Map<String, Set<DagNode<JobExecutionPlan>>> dagIdToNextJobs = Maps.newHashMap();
       dagIdToNextJobs.put(dagId, nextNodes);
       return dagIdToNextJobs;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
index 0984a91..55ce6f0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -34,20 +34,22 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 public interface DagStateStore {
   /**
    * Persist the {@link Dag} to the backing store.
-   * @param dag
+   * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted
+   * and be picked up again when leader transition happens.
+   * @param dag The dag submitted to {@link DagManager}
    */
-  public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+  void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
 
   /**
    * Delete the {@link Dag} from the backing store, typically upon completion of execution.
-   * @param dag
+   * @param dag The dag completed/cancelled from execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
    */
-  public void cleanUp(Dag<JobExecutionPlan> dag);
+  void cleanUp(Dag<JobExecutionPlan> dag);
 
   /**
    * Load all currently running {@link Dag}s from the underlying store. Typically, invoked when a new {@link DagManager}
    * takes over or on restart of service.
    * @return a {@link List} of currently running {@link Dag}s.
    */
-  public List<Dag<JobExecutionPlan>> getDags() throws IOException;
+  List<Dag<JobExecutionPlan>> getDags() throws IOException;
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 5473c8e..338cea7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -121,7 +121,7 @@ public class FSDagStateStore implements DagStateStore {
   /**
    * Return a {@link Dag} given a file name.
    * @param dagFile
-   * @return the {@link Dag} associated with the dagFiel.
+   * @return the {@link Dag} associated with the dagFile.
    */
   @VisibleForTesting
   public Dag<JobExecutionPlan> getDag(File dagFile) throws IOException {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index d4ac3ac..45e65bc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -108,7 +108,7 @@ public class JobExecutionPlan {
       jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
 
       //Remove template uri
-      jobSpec.setConfig(jobSpec.getConfig().withoutPath(FSFlowCatalog.JOB_TEMPLATE_KEY));
+      jobSpec.setConfig(jobSpec.getConfig().withoutPath(FSFlowTemplateCatalog.JOB_TEMPLATE_KEY));
 
       // Add job.name and job.group
       jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef(jobName)));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
similarity index 84%
rename from gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
rename to gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
index e1f2b6a..0dafc2d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
@@ -56,9 +56,15 @@ import org.apache.gobblin.util.PathUtils;
  * <p> /path/to/template/catalog/flowName/flow.conf </p>
  * <p> /path/to/template/catalog/flowName/jobs/job1.(job|template) </p>
  * <p> /path/to/template/catalog/flowName/jobs/job2.(job|template) </p>
+ *
+ * Avoid confusing with {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} which is a catalog for
+ * {@link org.apache.gobblin.runtime.api.FlowSpec}.
+ *
+ * Note that any exceptions thrown here should be propagated into called level for handling, since the handling
+ * of exceptions while loading/resolving template is subject to caller logic.
  */
 @Alpha
-public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTemplates {
+public class FSFlowTemplateCatalog extends FSJobCatalog implements FlowCatalogWithTemplates {
   public static final String JOBS_DIR_NAME = "jobs";
   public static final String FLOW_CONF_FILE_NAME = "flow.conf";
   public static final List<String> JOB_FILE_EXTENSIONS = Arrays.asList(".job", ".template");
@@ -71,7 +77,7 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla
    * @param sysConfig that must contain the fully qualified path of the flow template catalog
    * @throws IOException
    */
-  public FSFlowCatalog(Config sysConfig)
+  public FSFlowTemplateCatalog(Config sysConfig)
       throws IOException {
     super(sysConfig.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
         sysConfig.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)));
@@ -87,13 +93,10 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla
    */
   public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
       throws SpecNotFoundException, JobTemplate.TemplateException, IOException, URISyntaxException {
-    if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) {
-      throw new RuntimeException("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
-    }
-    if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) {
-      throw new RuntimeException(
-          "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme());
+    if (!validateTemplateURI(flowTemplateDirURI)) {
+      throw new JobTemplate.TemplateException(String.format("The FlowTemplate %s is not valid", flowTemplateDirURI));
     }
+
     String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
     // path of uri is location of template file relative to the job configuration root directory
     Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirURI.getPath()));
@@ -125,13 +128,10 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla
       return false;
     };
 
-    if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) {
-      throw new RuntimeException("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
-    }
-    if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) {
-      throw new RuntimeException(
-          "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme());
+    if (!validateTemplateURI(flowTemplateDirURI)) {
+      throw new JobTemplate.TemplateException(String.format("The FlowTemplate %s is not valid", flowTemplateDirURI));
     }
+
     List<JobTemplate> jobTemplates = new ArrayList<>();
 
     String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
@@ -167,4 +167,23 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla
       return ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8)).resolve(options);
     }
   }
+
+  /**
+   * Determine if an URI of a jobTemplate or a FlowTemplate is valid.
+   * @param flowURI The given job/flow template
+   * @return true if the URI is valid.
+   */
+  private boolean validateTemplateURI(URI flowURI) {
+    if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) {
+      log.error("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
+      return false;
+    }
+    if (!flowURI.getScheme().equals(FS_SCHEME)) {
+      log.error(
+          "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowURI.getScheme());
+      return false;
+    }
+
+    return true;
+  }
 }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
index 71ad56d..1912d15 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
@@ -96,7 +96,7 @@ public class GitConfigMonitorTest {
         .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI,
             this.remoteRepo.getDirectory().getAbsolutePath())
         .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
-        .addPrimitive(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog")
+        .addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog")
         .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
         .build();
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index 26f2180..6a02652 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.eclipse.jgit.api.Git;
 import org.eclipse.jgit.api.errors.GitAPIException;
 import org.eclipse.jgit.dircache.DirCache;
@@ -60,7 +61,6 @@ import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 
 
 public class GitFlowGraphMonitorTest {
@@ -81,7 +81,7 @@ public class GitFlowGraphMonitorTest {
   private final File edge1File = new File(edge1Dir, "edge1.properties");
 
   private RefSpec masterRefSpec = new RefSpec("master");
-  private Optional<FSFlowCatalog> flowCatalog;
+  private Optional<FSFlowTemplateCatalog> flowCatalog;
   private Config config;
   private BaseFlowGraph flowGraph;
   private GitFlowGraphMonitor gitFlowGraphMonitor;
@@ -111,7 +111,7 @@ public class GitFlowGraphMonitorTest {
         .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
         .build();
 
-    // Create a FSFlowCatalog instance
+    // Create a FSFlowTemplateCatalog instance
     URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
     Properties properties = new Properties();
     properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
@@ -119,7 +119,7 @@ public class GitFlowGraphMonitorTest {
     Config templateCatalogCfg = config
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    this.flowCatalog = Optional.of(new FSFlowCatalog(templateCatalogCfg));
+    this.flowCatalog = Optional.of(new FSFlowTemplateCatalog(templateCatalogCfg));
 
     //Create a FlowGraph instance with defaults
     this.flowGraph = new BaseFlowGraph();
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index efb1a6d..a4a4497 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
 import org.slf4j.Logger;
@@ -142,7 +143,7 @@ public class GobblinServiceHATest {
     Properties node1ServiceCoreProperties = new Properties();
     node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
     node1ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, NODE_1_TOPOLOGY_SPEC_STORE_DIR);
-    node1ServiceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, NODE_1_FLOW_SPEC_STORE_DIR);
+    node1ServiceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, NODE_1_FLOW_SPEC_STORE_DIR);
     node1ServiceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, NODE_1_JOB_STATUS_STATE_STORE_DIR);
     node1ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "QuartzScheduler1");
     node1ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
@@ -150,7 +151,7 @@ public class GobblinServiceHATest {
     Properties node2ServiceCoreProperties = new Properties();
     node2ServiceCoreProperties.putAll(commonServiceCoreProperties);
     node2ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, NODE_2_TOPOLOGY_SPEC_STORE_DIR);
-    node2ServiceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, NODE_2_FLOW_SPEC_STORE_DIR);
+    node2ServiceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, NODE_2_FLOW_SPEC_STORE_DIR);
     node2ServiceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, NODE_2_JOB_STATUS_STATE_STORE_DIR);
     node2ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "QuartzScheduler2");
     node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index efe1b95..d5105be 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -25,6 +25,7 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
 import org.eclipse.jgit.api.Git;
@@ -110,7 +111,7 @@ public class GobblinServiceManagerTest {
     serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
 
     serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR);
-    serviceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR);
+    serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR);
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY, TEST_GOBBLIN_EXECUTOR_NAME);
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".description",
         "StandaloneTestExecutor");
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 5877e66..4bffdb1 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -84,7 +85,6 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 import org.apache.gobblin.util.CompletedFuture;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
@@ -124,7 +124,7 @@ public class MultiHopFlowCompilerTest {
     URI specExecutorCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
     Map<URI, TopologySpec> topologySpecMap = buildTopologySpecMap(specExecutorCatalogUri);
 
-    //Create a FSFlowCatalog instance
+    //Create a FSFlowTemplateCatalog instance
     URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
     Properties properties = new Properties();
     properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
@@ -132,7 +132,7 @@ public class MultiHopFlowCompilerTest {
     Config templateCatalogCfg = config
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+    FSFlowTemplateCatalog flowCatalog = new FSFlowTemplateCatalog(templateCatalogCfg);
 
     //Add FlowEdges from the edge properties files
     URI flowEdgesURI = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
index 085d0a7..79067f8 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
@@ -34,7 +34,7 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -66,7 +66,7 @@ public class BaseFlowEdgeFactoryTest {
     Config templateCatalogCfg = config
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+    FSFlowTemplateCatalog catalog = new FSFlowTemplateCatalog(templateCatalogCfg);
     Config edgeProps = ConfigUtils.propertiesToConfig(properties);
     FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog, specExecutorList);
     Assert.assertEquals(flowEdge.getSrc(), "node1");
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index e289cbd..d30de37 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -40,6 +40,8 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.apache.gobblin.runtime.spec_catalog.FlowCatalog.*;
+
 
 public class GobblinServiceJobSchedulerTest {
   private static final String TEST_GROUP_NAME = "testGroup";
@@ -56,7 +58,7 @@ public class GobblinServiceJobSchedulerTest {
     File specDir = Files.createTempDir();
 
     Properties properties = new Properties();
-    properties.setProperty(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+    properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
     FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
     ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index 2542f5e..4ffdb67 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -44,7 +44,7 @@ import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -56,7 +56,7 @@ public class JobExecutionPlanDagFactoryTest {
 
   @BeforeClass
   public void setUp() throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException {
-    // Create a FSFlowCatalog instance
+    // Create a FSFlowTemplateCatalog instance
     URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
     Properties properties = new Properties();
     properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
@@ -64,7 +64,7 @@ public class JobExecutionPlanDagFactoryTest {
     Config templateCatalogCfg = config
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+    FSFlowTemplateCatalog catalog = new FSFlowTemplateCatalog(templateCatalogCfg);
     FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
     this.jobTemplates = flowTemplate.getJobTemplates();
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
similarity index 96%
rename from gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
rename to gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
index 3c8ebd3..550c97d 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
@@ -43,21 +43,21 @@ import org.testng.collections.Lists;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class FSFlowCatalogTest {
+public class FSFlowTemplateCatalogTest {
   private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
   private static final String TEST_TEMPLATE_DIR_URI = "FS:///" + TEST_TEMPLATE_NAME;
 
   @Test
   public void testGetFlowTemplate() throws Exception {
     URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-    // Create a FSFlowCatalog instance
+    // Create a FSFlowTemplateCatalog instance
     Properties properties = new Properties();
     properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
     Config config = ConfigFactory.parseProperties(properties);
     Config templateCatalogCfg = config
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+    FSFlowTemplateCatalog catalog = new FSFlowTemplateCatalog(templateCatalogCfg);
     FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_DIR_URI));
 
     //Basic sanity check for the FlowTemplate