You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/06/11 21:23:33 UTC

[2/2] incubator-gobblin git commit: [GOBBLIN-505] Implement a Git-based FlowGraph Monitor.

[GOBBLIN-505] Implement a Git-based FlowGraph Monitor.

Closes #2382 from sv2000/gitGraphMonitor


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6b120185
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6b120185
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6b120185

Branch: refs/heads/master
Commit: 6b1201852aa71b904da5f8a49a550e8596fe59fc
Parents: 2c5e25d
Author: suvasude <su...@linkedin.biz>
Authored: Mon Jun 11 14:23:21 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Jun 11 14:23:21 2018 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  25 +-
 .../service/modules/core/GitConfigMonitor.java  | 325 ++----------------
 .../modules/core/GitFlowGraphMonitor.java       | 309 ++++++++++++++++++
 .../modules/core/GitMonitoringService.java      | 326 +++++++++++++++++++
 .../service/modules/flowgraph/BaseFlowEdge.java |  51 +--
 .../modules/flowgraph/FlowEdgeFactory.java      |  13 +-
 .../flowgraph/FlowGraphConfigurationKeys.java   |   7 +-
 .../modules/core/GitConfigMonitorTest.java      |  21 +-
 .../modules/core/GobblinServiceManagerTest.java |  14 +-
 .../modules/flowgraph/BaseFlowGraphTest.java    |  46 +--
 .../modules/core/GitFlowGraphMonitorTest.java   | 314 ++++++++++++++++++
 11 files changed, 1059 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b6196da..50e6020 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -103,7 +103,7 @@ public class ConfigurationKeys {
   public static final String JOB_CONFIG_FILE_DIR_KEY = "jobconf.dir";
 
   // Path where all job configuration files stored
-  public static final String JOB_CONFIG_FILE_GENERAL_PATH_KEY = "jobconf.fullyQualifiedPath" ;
+  public static final String JOB_CONFIG_FILE_GENERAL_PATH_KEY = "jobconf.fullyQualifiedPath";
   // Job configuration file extensions
   public static final String JOB_CONFIG_FILE_EXTENSIONS_KEY = "jobconf.extensions";
   public static final String DEFAULT_JOB_CONFIG_FILE_EXTENSIONS = "pull,job";
@@ -208,13 +208,13 @@ public class ConfigurationKeys {
   public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE = TimeUnit.HOURS.toMillis(1);
 
   /** Optional, for user to specified which template to use, inside .job file */
-  public static final String JOB_TEMPLATE_PATH = "job.template" ;
+  public static final String JOB_TEMPLATE_PATH = "job.template";
 
   /**
    * Configuration property used only for job configuration file's tempalte, inside .template file
    */
   public static final String REQUIRED_ATRRIBUTES_LIST = "gobblin.template.required_attributes";
-  public static final String JOB_DEPENDENCIES="dependencies";
+  public static final String JOB_DEPENDENCIES = "dependencies";
 
   /**
    * Configuration for emitting job events
@@ -250,7 +250,7 @@ public class ConfigurationKeys {
    */
   // This property is used to specify the URN of a dataset a job or WorkUnit extracts data for
   public static final String DATASET_URN_KEY = "dataset.urn";
-  public static final String GLOBAL_WATERMARK_DATASET_URN="__globalDatasetWatermark";
+  public static final String GLOBAL_WATERMARK_DATASET_URN = "__globalDatasetWatermark";
   public static final String DEFAULT_DATASET_URN = "";
 
   /**
@@ -900,16 +900,11 @@ public class ConfigurationKeys {
   public static final String CONFIG_BASED_PREFIX = "gobblin.configBased";
 
   /**
-   * Configuration related to the git flow config monitoring service
+   * Configuration related to the Git based monitoring service
    */
-  public static final String GIT_CONFIG_MONITOR_PREFIX = "gitConfigMonitor.";
-  public static final String GIT_CONFIG_MONITOR_REPO_URI = GIT_CONFIG_MONITOR_PREFIX + "repositoryUri";
-  public static final String GIT_CONFIG_MONITOR_REPO_DIR = GIT_CONFIG_MONITOR_PREFIX + "repositoryDirectory";
-  public static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR = "git-flow-config";
-  public static final String GIT_CONFIG_MONITOR_CONFIG_DIR = GIT_CONFIG_MONITOR_PREFIX + "configDirectory";
-  public static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR = "gobblin-config";
-  public static final String GIT_CONFIG_MONITOR_POLLING_INTERVAL = GIT_CONFIG_MONITOR_PREFIX + "pollingInterval";
-  public static final String GIT_CONFIG_MONITOR_BRANCH_NAME = GIT_CONFIG_MONITOR_PREFIX + "branchName";
-  public static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME = "master";
-  public static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60;
+  public static final String GIT_MONITOR_REPO_URI = "repositoryUri";
+  public static final String GIT_MONITOR_REPO_DIR = "repositoryDirectory";
+  public static final String GIT_MONITOR_CONFIG_BASE_DIR = "configBaseDirectory";
+  public static final String GIT_MONITOR_POLLING_INTERVAL = "pollingInterval";
+  public static final String GIT_MONITOR_BRANCH_NAME = "branchName";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 09d7bb4..9fd0555 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -16,34 +16,13 @@
  */
 package org.apache.gobblin.service.modules.core;
 
-import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.eclipse.jgit.api.Git;
-import org.eclipse.jgit.api.ResetCommand;
-import org.eclipse.jgit.api.errors.GitAPIException;
 import org.eclipse.jgit.diff.DiffEntry;
-import org.eclipse.jgit.errors.RepositoryNotFoundException;
-import org.eclipse.jgit.lib.ObjectId;
-import org.eclipse.jgit.lib.ObjectReader;
-import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.treewalk.CanonicalTreeParser;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Files;
-import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
@@ -55,11 +34,8 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.PullFileLoader;
 
-
 /**
  * Service that monitors for jobs from a git repository.
  * The git repository must have an inital commit that has no config files since that is used as a base for getting
@@ -69,148 +45,54 @@ import org.apache.gobblin.util.PullFileLoader;
  * The <flowGroup> and <flowName> is used to generate the URI used to store the config in the {@link FlowCatalog}
  */
 @Slf4j
-public class GitConfigMonitor extends AbstractIdleService {
+public class GitConfigMonitor extends GitMonitoringService {
+  public static final String GIT_CONFIG_MONITOR_PREFIX = "gitConfigMonitor";
+
   private static final String SPEC_DESCRIPTION = "Git-based flow config";
   private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
-  private static final int TERMINATION_TIMEOUT = 30;
+  private static final String PROPERTIES_EXTENSIONS = "pull,job";
+  private static final String CONF_EXTENSIONS = "json,conf";
+  private static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR = "git-flow-config";
+  private static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR = "gobblin-config";
+  private static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME = "master";
+
   private static final int CONFIG_FILE_DEPTH = 3;
-  private static final String REMOTE_NAME = "origin";
+  private static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60;
+
+  private static final Config DEFAULT_FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR)
+          .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR)
+          .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME)
+          .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL)
+          .put(GitMonitoringService.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
+          .put(GitMonitoringService.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
+          .build());
 
-  private final ScheduledExecutorService scheduledExecutor;
-  private final GitRepository gitRepo;
-  private final int pollingInterval;
-  private final String repositoryDir;
-  private final String configDir;
-  private final Path configDirPath;
   private final FlowCatalog flowCatalog;
-  private final PullFileLoader pullFileLoader;
   private final Config emptyConfig = ConfigFactory.empty();
-  private volatile boolean isActive = false;
 
-  /**
-   * Create a {@link GitConfigMonitor} that monitors a git repository for changes and manages config in a
-   * {@link FlowCatalog}
-   * @param config configuration
-   * @param flowCatalog the flow catalog
-   */
   GitConfigMonitor(Config config, FlowCatalog flowCatalog) {
+    super(config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
     this.flowCatalog = flowCatalog;
-
-    this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
-        ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor")));
-
-    Preconditions.checkArgument(config.hasPath(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI),
-        ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI + " needs to be specified.");
-
-    String repositoryUri = config.getString(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI);
-
-    this.repositoryDir = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR,
-        ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR);
-
-    this.configDir = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_CONFIG_DIR,
-        ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR);
-
-    this.pollingInterval = ConfigUtils.getInt(config, ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL,
-        ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL);
-
-    String branchName = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_BRANCH_NAME,
-        ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME);
-
-    this.configDirPath = new Path(this.repositoryDir, this.configDir);
-
-    try {
-      this.pullFileLoader = new PullFileLoader(this.configDirPath,
-          FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()),
-          PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS, PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not create pull file loader", e);
-    }
-
-    try {
-      this.gitRepo = new GitRepository(repositoryUri, this.repositoryDir, branchName);
-    } catch (GitAPIException | IOException e) {
-      throw new RuntimeException("Could not open git repository", e);
-    }
-  }
-
-  /** Start the service. */
-  @Override
-  protected void startUp() throws Exception {
-    log.info("Starting the " + GitConfigMonitor.class.getSimpleName());
-    log.info("Polling git with inteval {} ", this.pollingInterval);
-
-    // Schedule the job config fetch task
-    this.scheduledExecutor.scheduleAtFixedRate(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          processGitConfigChanges();
-        } catch (GitAPIException | IOException e) {
-          log.error("Failed to process git config changes", e);
-          // next run will try again since errors could be intermittent
-        }
-      }
-    }, 0, this.pollingInterval, TimeUnit.SECONDS);
   }
 
-  /** Stop the service. */
   @Override
-  protected void shutDown() throws Exception {
-    this.scheduledExecutor.shutdown();
-    this.scheduledExecutor.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
-  }
-
-  public synchronized void setActive(boolean isActive) {
-    if (this.isActive == isActive) {
-      // No-op if already in correct state
-      return;
-    }
-
-    this.isActive = isActive;
-  }
-
-  /**
-   * Fetch the list of changes since the last refresh of the repository and apply the changes to the {@link FlowCatalog}
-   * @throws GitAPIException
-   * @throws IOException
-   */
-  @VisibleForTesting
-  void processGitConfigChanges() throws GitAPIException, IOException {
+  public boolean shouldPollGit() {
     // if not active or if the flow catalog is not up yet then can't process config changes
     if (!isActive || !this.flowCatalog.isRunning()) {
-      log.info("GitConfigMonitor: skip poll since the JobCatalog is not yet running.");
-      return;
-    }
-
-    List<DiffEntry> changes = this.gitRepo.getChanges();
-
-    for (DiffEntry change : changes) {
-      switch (change.getChangeType()) {
-        case ADD:
-        case MODIFY:
-          addSpec(change);
-          break;
-        case DELETE:
-          removeSpec(change);
-          break;
-        case RENAME:
-          removeSpec(change);
-          addSpec(change);
-          break;
-        default:
-          throw new RuntimeException("Unsupported change type " + change.getChangeType());
-      }
+      log.warn("GitConfigMonitor: skip poll since the JobCatalog is not yet running. isActive = {}", this.isActive);
+      return false;
     }
-
-    // Done processing changes, so checkpoint
-    this.gitRepo.moveCheckpointAndHashesForward();
+    return true;
   }
 
   /**
    * Add a {@link FlowSpec} for an added, updated, or modified flow config
    * @param change
    */
-  private void addSpec(DiffEntry change) {
+  @Override
+  public void addChange(DiffEntry change) {
     if (checkConfigFilePath(change.getNewPath())) {
       Path configFilePath = new Path(this.repositoryDir, change.getNewPath());
 
@@ -232,7 +114,8 @@ public class GitConfigMonitor extends AbstractIdleService {
    * remove a {@link FlowSpec} for a deleted or renamed flow config
    * @param change
    */
-  private void removeSpec(DiffEntry change) {
+  @Override
+  public void removeChange(DiffEntry change) {
     if (checkConfigFilePath(change.getOldPath())) {
       Path configFilePath = new Path(this.repositoryDir, change.getOldPath());
       String flowName = FSSpecStore.getSpecName(configFilePath);
@@ -265,10 +148,10 @@ public class GitConfigMonitor extends AbstractIdleService {
     Path configFile = new Path(configFilePath);
     String fileExtension = Files.getFileExtension(configFile.getName());
 
-    if (configFile.depth() != CONFIG_FILE_DEPTH ||
-        !configFile.getParent().getParent().getName().equals(configDir) ||
-        !(PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension) ||
-            PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension))) {
+    if (configFile.depth() != CONFIG_FILE_DEPTH
+        || !configFile.getParent().getParent().getName().equals(folderName)
+        || !(PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension)
+        || PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension))) {
       log.warn("Changed file does not conform to directory structure and file name format, skipping: "
           + configFilePath);
 
@@ -292,144 +175,4 @@ public class GitConfigMonitor extends AbstractIdleService {
     return flowConfig.withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName))
         .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup));
   }
-
-  /**
-   * Class for managing a git repository
-   */
-  private static class GitRepository {
-    private final static String CHECKPOINT_FILE = "checkpoint.txt";
-    private final static String CHECKPOINT_FILE_TMP = "checkpoint.tmp";
-    private final String repoUri;
-    private final String repoDir;
-    private final String branchName;
-    private Git git;
-    private String lastProcessedGitHash;
-    private String latestGitHash;
-
-    /**
-     * Create an object to manage the git repository stored locally at repoDir with a repository URI of repoDir
-     * @param repoUri URI of repository
-     * @param repoDir Directory to hold the local copy of the repository
-     * @param branchName Branch name
-     * @throws GitAPIException
-     * @throws IOException
-     */
-    private GitRepository(String repoUri, String repoDir, String branchName) throws GitAPIException, IOException {
-      this.repoUri = repoUri;
-      this.repoDir = repoDir;
-      this.branchName = branchName;
-
-      initRepository();
-    }
-
-    /**
-     * Open the repository if it exists locally, otherwise clone it
-     * @throws GitAPIException
-     * @throws IOException
-     */
-    private void initRepository() throws GitAPIException, IOException {
-      File repoDirFile = new File(this.repoDir);
-
-      try {
-        this.git = Git.open(repoDirFile);
-
-        String uri = this.git.getRepository().getConfig().getString("remote", REMOTE_NAME, "url");
-
-        if (!uri.equals(this.repoUri)) {
-          throw new RuntimeException("Repo at " + this.repoDir + " has uri " + uri + " instead of " + this.repoUri);
-        }
-      } catch (RepositoryNotFoundException e) {
-        // if the repository was not found then clone a new one
-        this.git = Git.cloneRepository()
-            .setDirectory(repoDirFile)
-            .setURI(this.repoUri)
-            .setBranch(this.branchName)
-            .call();
-      }
-
-      try {
-        this.lastProcessedGitHash = readCheckpoint();
-      } catch (FileNotFoundException e) {
-        // if no checkpoint is available then start with the first commit
-        Iterable<RevCommit> logs = git.log().call();
-        RevCommit lastLog = null;
-
-        for (RevCommit log : logs) {
-          lastLog = log;
-        }
-
-        if (lastLog != null) {
-          this.lastProcessedGitHash = lastLog.getName();
-        }
-      }
-
-      this.latestGitHash = this.lastProcessedGitHash;
-    }
-
-    /**
-     * Read the last processed commit githash from the checkpoint file
-     * @return
-     * @throws IOException
-     */
-    private String readCheckpoint() throws IOException {
-      File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE);
-      return Files.toString(checkpointFile, Charsets.UTF_8);
-    }
-
-    /**
-     * Write the last processed commit githash to the checkpoint file
-     * @param gitHash
-     * @throws IOException
-     */
-    private void writeCheckpoint(String gitHash) throws IOException {
-      // write to a temporary name then rename to make the operation atomic when the file system allows a file to be
-      // replaced
-      File tmpCheckpointFile = new File(this.repoDir, CHECKPOINT_FILE_TMP);
-      File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE);
-
-      Files.write(gitHash, tmpCheckpointFile, Charsets.UTF_8);
-
-      Files.move(tmpCheckpointFile, checkpointFile);
-    }
-
-    private void moveCheckpointAndHashesForward() throws IOException {
-      this.lastProcessedGitHash = this.latestGitHash;
-
-      writeCheckpoint(this.latestGitHash);
-    }
-
-    /**
-     *
-     * @throws GitAPIException
-     * @throws IOException
-     */
-    private List<DiffEntry> getChanges() throws GitAPIException, IOException {
-      // get tree for last processed commit
-      ObjectId oldHeadTree = git.getRepository().resolve(this.lastProcessedGitHash + "^{tree}");
-
-      // refresh to latest and reset hard to handle forced pushes
-      this.git.fetch().setRemote(REMOTE_NAME).call();
-      // reset hard to get a clean working set since pull --rebase may leave files around
-      this.git.reset().setMode(ResetCommand.ResetType.HARD).setRef(REMOTE_NAME + "/" + this.branchName).call();
-
-      ObjectId head = this.git.getRepository().resolve("HEAD");
-      ObjectId headTree = this.git.getRepository().resolve("HEAD^{tree}");
-
-      // remember the hash for the current HEAD. This will be checkpointed after the diff is processed.
-      latestGitHash = head.getName();
-
-      // diff old and new heads to find changes
-      ObjectReader reader = this.git.getRepository().newObjectReader();
-      CanonicalTreeParser oldTreeIter = new CanonicalTreeParser();
-      oldTreeIter.reset(reader, oldHeadTree);
-      CanonicalTreeParser newTreeIter = new CanonicalTreeParser();
-      newTreeIter.reset(reader, headTree);
-
-      return this.git.diff()
-          .setNewTree(newTreeIter)
-          .setOldTree(oldTreeIter)
-          .setShowNameAndStatusOnly(true)
-          .call();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
new file mode 100644
index 0000000..c7dd226
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.diff.DiffEntry;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * Service that monitors for changes to {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph} from a git repository.
+ * The git repository must have an inital commit that has no files since that is used as a base for getting
+ * the change list.
+ * The {@link DataNode}s and {@link FlowEdge}s in FlowGraph need to be organized with the following directory structure on git:
+ * <root_flowGraph_dir>/<nodeName>/<nodeName>.properties
+ * <root_flowGraph_dir>/<nodeName1>/<nodeName2>/<edgeName>.properties
+ */
+@Slf4j
+public class GitFlowGraphMonitor extends GitMonitoringService {
+  public static final String GIT_FLOWGRAPH_MONITOR_PREFIX = "gitFlowGraphMonitor";
+
+  private static final String PROPERTIES_EXTENSIONS = "properties";
+  private static final String CONF_EXTENSIONS = StringUtils.EMPTY;
+  private static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":";
+  private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR = "git-flowgraph";
+  private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = "gobblin-flowgraph";
+  private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME = "master";
+
+  private static final int NODE_FILE_DEPTH = 3;
+  private static final int EDGE_FILE_DEPTH = 4;
+  private static final int DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL = 60;
+
+  private static final Config DEFAULT_FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
+          .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+          .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
+          .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
+          .put(JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
+          .put(HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
+          .build());
+
+  private FSFlowCatalog flowCatalog;
+  private FlowGraph flowGraph;
+  private final Config emptyConfig = ConfigFactory.empty();
+
+  public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph) {
+    super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
+    this.flowCatalog = flowCatalog;
+    this.flowGraph = graph;
+  }
+
+  /**
+   * Determine if the service should poll Git.
+   */
+  @Override
+  public boolean shouldPollGit() {
+    return this.isActive;
+  }
+
+  /**
+   * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
+   * the {@link FlowGraph} for an added, updated or modified node or edge file.
+   * @param change
+   */
+  @Override
+  public void addChange(DiffEntry change) {
+    Path path = new Path(change.getNewPath());
+    if (path.depth() == NODE_FILE_DEPTH) {
+      addDataNode(change);
+    } else if (path.depth() == EDGE_FILE_DEPTH) {
+      addFlowEdge(change);
+    }
+  }
+
+  /**
+   * Remove an element (i.e. either a {@link DataNode} or a {@link FlowEdge} from the {@link FlowGraph} for
+   * a renamed or deleted {@link DataNode} or {@link FlowEdge} file.
+   * @param change
+   */
+  @Override
+  public void removeChange(DiffEntry change) {
+    Path path = new Path(change.getOldPath());
+    if (path.depth() == NODE_FILE_DEPTH) {
+      removeDataNode(change);
+    } else if (path.depth() == EDGE_FILE_DEPTH) {
+      removeFlowEdge(change);
+    }
+  }
+
+  /**
+   * Add a {@link DataNode} to the {@link FlowGraph}. The method uses the {@link FlowGraphConfigurationKeys#DATA_NODE_CLASS} config
+   * to instantiate a {@link DataNode} from the node config file.
+   * @param change
+   */
+  private void addDataNode(DiffEntry change) {
+    if (checkFilePath(change.getNewPath(), NODE_FILE_DEPTH)) {
+      Path nodeFilePath = new Path(this.repositoryDir, change.getNewPath());
+      try {
+        Config config = loadNodeFileWithOverrides(nodeFilePath);
+        Class dataNodeClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.DATA_NODE_CLASS,
+            FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+        DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, config);
+        if (!this.flowGraph.addDataNode(dataNode)) {
+          log.warn("Could not add DataNode {} to FlowGraph; skipping", dataNode.getId());
+        }
+      } catch (Exception e) {
+        log.warn("Could not add DataNode defined in {} due to exception {}", change.getNewPath(), e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Remove a {@link DataNode} from the {@link FlowGraph}. The method extracts the nodeId of the
+   * {@link DataNode} from the node config file and uses it to delete the associated {@link DataNode}.
+   * @param change
+   */
+  private void removeDataNode(DiffEntry change) {
+    if (checkFilePath(change.getOldPath(), NODE_FILE_DEPTH)) {
+      Path nodeFilePath = new Path(this.repositoryDir, change.getOldPath());
+      Config config = getNodeConfigWithOverrides(ConfigFactory.empty(), nodeFilePath);
+      String nodeId = config.getString(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY);
+      if (!this.flowGraph.deleteDataNode(nodeId)) {
+        log.warn("Could not remove DataNode {} from FlowGraph; skipping", nodeId);
+      }
+    }
+  }
+
+  /**
+   * Add a {@link FlowEdge} to the {@link FlowGraph}. The method uses the {@link FlowEdgeFactory} instance
+   * provided by the {@link FlowGraph} to build a {@link FlowEdge} from the edge config file.
+   * @param change
+   */
+  private void addFlowEdge(DiffEntry change) {
+    if (checkFilePath(change.getNewPath(), EDGE_FILE_DEPTH)) {
+      Path edgeFilePath = new Path(this.repositoryDir, change.getNewPath());
+      try {
+        Config config = loadEdgeFileWithOverrides(edgeFilePath);
+        Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+            FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
+        FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
+        FlowEdge edge = flowEdgeFactory.createFlowEdge(config, flowCatalog);
+        if (!this.flowGraph.addFlowEdge(edge)) {
+          log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId());
+        }
+      } catch (Exception e) {
+        log.warn("Could not add edge defined in {} due to exception {}", change.getNewPath(), e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Remove a {@link FlowEdge} from the {@link FlowGraph}. The method uses {@link FlowEdgeFactory}
+   * to construct the edgeId of the {@link FlowEdge} from the config file and uses it to delete the associated
+   * {@link FlowEdge}.
+   * @param change
+   */
+  private void removeFlowEdge(DiffEntry change) {
+    if (checkFilePath(change.getOldPath(), EDGE_FILE_DEPTH)) {
+      Path edgeFilePath = new Path(this.repositoryDir, change.getOldPath());
+      try {
+        Config config = getEdgeConfigWithOverrides(ConfigFactory.empty(), edgeFilePath);
+        String edgeId = config.getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY);
+        if (!this.flowGraph.deleteFlowEdge(edgeId)) {
+          log.warn("Could not remove FlowEdge {} from FlowGraph; skipping", edgeId);
+        }
+      } catch (Exception e) {
+        log.warn("Could not remove edge defined in {} due to exception {}", edgeFilePath, e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * check whether the file has the proper naming and hierarchy
+   * @param file the relative path from the repo root
+   * @return false if the file does not conform
+   */
+  private boolean checkFilePath(String file, int depth) {
+    // The file is either a node file or an edge file and needs to be stored at either:
+    // flowGraphDir/nodeName/nodeName.properties (if it is a node file), or
+    // flowGraphDir/nodeName/nodeName/edgeName.properties (if it is an edge file)
+
+    Path filePath = new Path(file);
+    String fileExtension = Files.getFileExtension(filePath.getName());
+    if (filePath.depth() != depth || !checkFileLevelRelativeToRoot(filePath, depth)
+        || !(this.javaPropsExtensions.contains(fileExtension))) {
+      log.warn("Changed file does not conform to directory structure and file name format, skipping: "
+          + filePath);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Helper to check if a file has proper hierarchy.
+   * @param filePath path of the node/edge file
+   * @param depth expected depth of the file
+   * @return true if the file conforms to the expected hierarchy
+   */
+  private boolean checkFileLevelRelativeToRoot(Path filePath, int depth) {
+    if(filePath == null) {
+      return false;
+    }
+    Path path = filePath;
+    for (int i = 0; i < depth - 1; i++) {
+      path = path.getParent();
+    }
+    if (!path.getName().equals(folderName)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Helper that overrides the data.node.id property with name derived from the node file path
+   * @param nodeConfig node config
+   * @param nodeFilePath path of the node file
+   * @return config with overridden data.node.id
+   */
+  private Config getNodeConfigWithOverrides(Config nodeConfig, Path nodeFilePath) {
+    String nodeId = nodeFilePath.getParent().getName();
+    return nodeConfig.withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef(nodeId));
+  }
+
+  /**
+   * Helper that overrides the flow edge properties with name derived from the edge file path
+   * @param edgeConfig edge config
+   * @param edgeFilePath path of the edge file
+   * @return config with overridden edge properties
+   */
+  private Config getEdgeConfigWithOverrides(Config edgeConfig, Path edgeFilePath) {
+    String source = edgeFilePath.getParent().getParent().getName();
+    String destination = edgeFilePath.getParent().getName();
+    String edgeName = Files.getNameWithoutExtension(edgeFilePath.getName());
+    return edgeConfig.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ConfigValueFactory.fromAnyRef(source))
+        .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ConfigValueFactory.fromAnyRef(destination))
+        .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(getEdgeId(source, destination, edgeName)));
+  }
+
+  /**
+   * Load the node file.
+   * @param filePath path of the node file relative to the repository root
+   * @return the configuration object
+   * @throws IOException
+   */
+  private Config loadNodeFileWithOverrides(Path filePath) throws IOException {
+    Config nodeConfig = this.pullFileLoader.loadPullFile(filePath, emptyConfig, false);
+    return getNodeConfigWithOverrides(nodeConfig, filePath);
+  }
+
+  /**
+   * Load the edge file.
+   * @param filePath path of the edge file relative to the repository root
+   * @return the configuration object
+   * @throws IOException
+   */
+  private Config loadEdgeFileWithOverrides(Path filePath) throws IOException {
+    Config edgeConfig = this.pullFileLoader.loadPullFile(filePath, emptyConfig, false);
+    return getEdgeConfigWithOverrides(edgeConfig, filePath);
+  }
+
+  /**
+   * Get an edge label from the edge properties
+   * @param source source data node id
+   * @param destination destination data node id
+   * @param edgeName simple name of the edge (e.g. file name without extension of the edge file)
+   * @return a string label identifying the edge
+   */
+  private String getEdgeId(String source, String destination, String edgeName) {
+    return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(source, destination, edgeName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
new file mode 100644
index 0000000..2361edc
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.ResetCommand;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.diff.DiffEntry;
+import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectReader;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.treewalk.CanonicalTreeParser;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.PullFileLoader;
+
+
+@Slf4j
+public abstract class GitMonitoringService extends AbstractIdleService {
+  private static final String REMOTE_NAME = "origin";
+  private static final int TERMINATION_TIMEOUT = 30;
+
+  public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
+  public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
+
+  private Integer pollingInterval;
+  protected final ScheduledExecutorService scheduledExecutor;
+  protected GitMonitoringService.GitRepository gitRepo;
+  protected String repositoryDir;
+  protected String folderName;
+  protected Path folderPath;
+  protected final PullFileLoader pullFileLoader;
+  protected final Set<String> javaPropsExtensions;
+  protected final Set<String> hoconFileExtensions;
+  protected volatile boolean isActive = false;
+
+  public GitMonitoringService(Config config) {
+    Preconditions.checkArgument(config.hasPath(ConfigurationKeys.GIT_MONITOR_REPO_URI),
+        ConfigurationKeys.GIT_MONITOR_REPO_URI + " needs to be specified.");
+
+    String repositoryUri = config.getString(ConfigurationKeys.GIT_MONITOR_REPO_URI);
+    this.repositoryDir = config.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR);
+    String branchName = config.getString(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME);
+    this.pollingInterval = config.getInt(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL);
+    this.folderName = config.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR);
+
+    try {
+      this.gitRepo = new GitMonitoringService.GitRepository(repositoryUri, repositoryDir, branchName);
+    } catch (GitAPIException | IOException e) {
+      throw new RuntimeException("Could not open git repository", e);
+    }
+
+    this.folderPath = new Path(this.repositoryDir, this.folderName);
+    this.javaPropsExtensions = Sets.newHashSet(config.getString(JAVA_PROPS_EXTENSIONS).split(","));
+    this.hoconFileExtensions = Sets.newHashSet(config.getString(HOCON_FILE_EXTENSIONS).split(","));
+    try {
+      this.pullFileLoader = new PullFileLoader(this.folderPath,
+          FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()),
+          this.javaPropsExtensions, this.hoconFileExtensions);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not create pull file loader", e);
+    }
+
+    this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor")));
+  }
+
+  synchronized void setActive(boolean isActive) {
+    if (this.isActive == isActive) {
+      // No-op if already in correct state
+      return;
+    }
+
+    this.isActive = isActive;
+  }
+
+
+  /** Start the service. */
+  @Override
+  protected void startUp() throws Exception {
+    log.info("Starting the " + GitConfigMonitor.class.getSimpleName());
+    log.info("Polling git with inteval {} ", this.pollingInterval);
+
+    // Schedule the job config fetch task
+    this.scheduledExecutor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          if (shouldPollGit()) {
+            processGitConfigChanges();
+          }
+        } catch (GitAPIException | IOException e) {
+          log.error("Failed to process git config changes", e);
+          // next run will try again since errors could be intermittent
+        }
+      }
+    }, 0, this.pollingInterval, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Fetch the list of changes since the last refresh of the repository and apply the changes to the {@link FlowCatalog}
+   * @throws GitAPIException
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public void processGitConfigChanges() throws GitAPIException, IOException {
+    List<DiffEntry> changes = this.gitRepo.getChanges();
+
+    for (DiffEntry change : changes) {
+      switch (change.getChangeType()) {
+        case ADD:
+        case MODIFY:
+          addChange(change);
+          break;
+        case DELETE:
+          removeChange(change);
+          break;
+        case RENAME:
+          removeChange(change);
+          addChange(change);
+          break;
+        default:
+          throw new RuntimeException("Unsupported change type " + change.getChangeType());
+      }
+    }
+
+    // Done processing changes, so checkpoint
+    this.gitRepo.moveCheckpointAndHashesForward();
+  }
+
+  /** Stop the service. */
+  @Override
+  protected void shutDown() throws Exception {
+    this.scheduledExecutor.shutdown();
+    this.scheduledExecutor.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Class for managing a git repository
+   */
+  static class GitRepository {
+    private final static String CHECKPOINT_FILE = "checkpoint.txt";
+    private final static String CHECKPOINT_FILE_TMP = "checkpoint.tmp";
+    private final String repoUri;
+    private final String repoDir;
+    private final String branchName;
+    private Git git;
+    private String lastProcessedGitHash;
+    private String latestGitHash;
+
+    /**
+     * Create an object to manage the git repository stored locally at repoDir with a repository URI of repoDir
+     * @param repoUri URI of repository
+     * @param repoDir Directory to hold the local copy of the repository
+     * @param branchName Branch name
+     * @throws GitAPIException
+     * @throws IOException
+     */
+    GitRepository(String repoUri, String repoDir, String branchName) throws GitAPIException, IOException {
+      this.repoUri = repoUri;
+      this.repoDir = repoDir;
+      this.branchName = branchName;
+
+      initRepository();
+    }
+
+    /**
+     * Open the repository if it exists locally, otherwise clone it
+     * @throws GitAPIException
+     * @throws IOException
+     */
+    private void initRepository() throws GitAPIException, IOException {
+      File repoDirFile = new File(this.repoDir);
+
+      try {
+        this.git = Git.open(repoDirFile);
+
+        String uri = this.git.getRepository().getConfig().getString("remote", REMOTE_NAME, "url");
+
+        if (!uri.equals(this.repoUri)) {
+          throw new RuntimeException("Repo at " + this.repoDir + " has uri " + uri + " instead of " + this.repoUri);
+        }
+      } catch (RepositoryNotFoundException e) {
+        // if the repository was not found then clone a new one
+        this.git = Git.cloneRepository()
+            .setDirectory(repoDirFile)
+            .setURI(this.repoUri)
+            .setBranch(this.branchName)
+            .call();
+      }
+
+      try {
+        this.lastProcessedGitHash = readCheckpoint();
+      } catch (FileNotFoundException e) {
+        // if no checkpoint is available then start with the first commit
+        Iterable<RevCommit> logs = git.log().call();
+        RevCommit lastLog = null;
+
+        for (RevCommit log : logs) {
+          lastLog = log;
+        }
+
+        if (lastLog != null) {
+          this.lastProcessedGitHash = lastLog.getName();
+        }
+      }
+
+      this.latestGitHash = this.lastProcessedGitHash;
+    }
+
+    /**
+     * Read the last processed commit githash from the checkpoint file
+     * @return
+     * @throws IOException
+     */
+    private String readCheckpoint() throws IOException {
+      File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE);
+      return Files.toString(checkpointFile, Charsets.UTF_8);
+    }
+
+    /**
+     * Write the last processed commit githash to the checkpoint file
+     * @param gitHash
+     * @throws IOException
+     */
+    private void writeCheckpoint(String gitHash) throws IOException {
+      // write to a temporary name then rename to make the operation atomic when the file system allows a file to be
+      // replaced
+      File tmpCheckpointFile = new File(this.repoDir, CHECKPOINT_FILE_TMP);
+      File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE);
+
+      Files.write(gitHash, tmpCheckpointFile, Charsets.UTF_8);
+
+      Files.move(tmpCheckpointFile, checkpointFile);
+    }
+
+    void moveCheckpointAndHashesForward() throws IOException {
+      this.lastProcessedGitHash = this.latestGitHash;
+
+      writeCheckpoint(this.latestGitHash);
+    }
+
+    /**
+     *
+     * @throws GitAPIException
+     * @throws IOException
+     */
+    List<DiffEntry> getChanges() throws GitAPIException, IOException {
+      // get tree for last processed commit
+      ObjectId oldHeadTree = git.getRepository().resolve(this.lastProcessedGitHash + "^{tree}");
+
+      // refresh to latest and reset hard to handle forced pushes
+      this.git.fetch().setRemote(REMOTE_NAME).call();
+      // reset hard to get a clean working set since pull --rebase may leave files around
+      this.git.reset().setMode(ResetCommand.ResetType.HARD).setRef(REMOTE_NAME + "/" + this.branchName).call();
+
+      ObjectId head = this.git.getRepository().resolve("HEAD");
+      ObjectId headTree = this.git.getRepository().resolve("HEAD^{tree}");
+
+      // remember the hash for the current HEAD. This will be checkpointed after the diff is processed.
+      latestGitHash = head.getName();
+
+      // diff old and new heads to find changes
+      ObjectReader reader = this.git.getRepository().newObjectReader();
+      CanonicalTreeParser oldTreeIter = new CanonicalTreeParser();
+      oldTreeIter.reset(reader, oldHeadTree);
+      CanonicalTreeParser newTreeIter = new CanonicalTreeParser();
+      newTreeIter.reset(reader, headTree);
+
+      return this.git.diff()
+          .setNewTree(newTreeIter)
+          .setOldTree(oldTreeIter)
+          .setShowNameAndStatusOnly(true)
+          .call();
+    }
+  }
+
+  public abstract boolean shouldPollGit();
+
+  public abstract void addChange(DiffEntry change);
+
+  public abstract void removeChange(DiffEntry change);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index 12b6222..fc82cc1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -17,19 +17,19 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.security.UserGroupInformation;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import joptsimple.internal.Strings;
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
@@ -37,17 +37,12 @@ import org.apache.gobblin.service.modules.template.FlowTemplate;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
-import joptsimple.internal.Strings;
-import lombok.Getter;
-
 
 /**
  * An implementation of {@link FlowEdge}.
  */
 @Alpha
 public class BaseFlowEdge implements FlowEdge {
-  public static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":";
-
   @Getter
   protected List<String> endPoints;
 
@@ -67,13 +62,13 @@ public class BaseFlowEdge implements FlowEdge {
   private boolean active;
 
   //Constructor
-  public BaseFlowEdge(List<String> endPoints, String edgeName, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
+  public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
     this.endPoints = endPoints;
     this.flowTemplate = flowTemplate;
     this.executors = executors;
     this.active = active;
     this.props = properties;
-    this.id = generateEdgeId(endPoints, edgeName);
+    this.id = edgeId;
   }
 
   @Override
@@ -81,10 +76,6 @@ public class BaseFlowEdge implements FlowEdge {
     return true;
   }
 
-  @VisibleForTesting
-  protected static String generateEdgeId(List<String> endPoints, String edgeName) {
-    return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(endPoints.get(0), endPoints.get(1), edgeName);
-  }
   /**
    *   The {@link FlowEdge}s are the same if they have the same endpoints and both refer to the same {@FlowTemplate} i.e.
    *   the {@link FlowTemplate} uris are the same
@@ -100,11 +91,11 @@ public class BaseFlowEdge implements FlowEdge {
 
     FlowEdge that = (FlowEdge) o;
 
-    if(!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) {
+    if (!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) {
       return false;
     }
 
-    if(!this.getFlowTemplate().getUri().equals(that.getFlowTemplate().getUri())) {
+    if (!this.getFlowTemplate().getUri().equals(that.getFlowTemplate().getUri())) {
       return false;
     }
     return true;
@@ -135,17 +126,17 @@ public class BaseFlowEdge implements FlowEdge {
     @Override
     public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog) throws FlowEdgeCreationException {
       try {
-        String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"");
+        String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, "");
         Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
-        String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,"");
+        String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "");
         Preconditions.checkArgument(!Strings.isNullOrEmpty(destination), "A FlowEdge must have a non-null or empty destination");
         List<String> endPoints = Lists.newArrayList(source, destination);
-        String edgeName = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY,"");
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name");
+        String edgeId = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeId), "A FlowEdge must have a non-null or empty Id");
 
         List<Config> specExecutorConfigList = new ArrayList<>();
         boolean flag;
-        for(int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)) != false; i++) {
+        for (int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)); i++) {
           specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i));
         }
 
@@ -168,26 +159,12 @@ public class BaseFlowEdge implements FlowEdge {
           specExecutors.add(executor);
         }
         FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri));
-        return new BaseFlowEdge(endPoints, edgeName, flowTemplate, specExecutors, edgeProps, isActive);
-      } catch(RuntimeException e) {
+        return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive);
+      } catch (RuntimeException e) {
         throw e;
       } catch (Exception e) {
         throw new FlowEdgeCreationException(e);
       }
     }
-
-    @Override
-    public String getEdgeId(Config edgeProps) throws IOException {
-      String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"");
-      Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
-      String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,"");
-      Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty destination");
-      String edgeName =
-          ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "");
-      Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name");
-      List<String> endPoints = Lists.newArrayList(source, destination);
-
-      return generateEdgeId(endPoints, edgeName);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
index 851e887..2977231 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
@@ -17,13 +17,9 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
 
 public interface FlowEdgeFactory {
   /**
@@ -36,13 +32,6 @@ public interface FlowEdgeFactory {
    */
   public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog catalog) throws FlowEdgeCreationException;
 
-  /**
-   * Get an edge label from the edge properties
-   * @param edgeProps properties of the edge
-   * @return a string label identifying the edge
-   */
-  public String getEdgeId(Config edgeProps) throws IOException;
-
   public class FlowEdgeCreationException extends Exception {
     private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s";
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
index 0d94e3f..cd4876a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
@@ -24,17 +24,22 @@ public class FlowGraphConfigurationKeys {
   /**
    *   {@link DataNode} configuration keys.
    */
+  public static final String DATA_NODE_CLASS = DATA_NODE_PREFIX + "class";
+  public static final String DEFAULT_DATA_NODE_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseDataNode";
   public static final String DATA_NODE_ID_KEY = DATA_NODE_PREFIX + "id";
   public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive";
 
   /**
    * {@link FlowEdge} configuration keys.
    */
+  public static final String FLOW_EDGE_FACTORY_CLASS = FLOW_EDGE_PREFIX + "factory.class";
+  public static final String DEFAULT_FLOW_EDGE_FACTORY_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseFlowEdge$Factory";
   public static final String FLOW_EDGE_SOURCE_KEY = FLOW_EDGE_PREFIX + "source";
   public static final String FLOW_EDGE_DESTINATION_KEY = FLOW_EDGE_PREFIX + "destination";
+  public static final String FLOW_EDGE_ID_KEY = FLOW_EDGE_PREFIX + "id";
   public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name";
   public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive";
   public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri";
-  public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX +"specExecutors";
+  public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors";
   public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
index 6cd8bf2..71ad56d 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
@@ -93,10 +93,11 @@ public class GitConfigMonitorTest {
     this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
 
     this.config = ConfigBuilder.create()
-        .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
-        .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
+        .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI,
+            this.remoteRepo.getDirectory().getAbsolutePath())
+        .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
         .addPrimitive(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog")
-        .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, 5)
+        .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
         .build();
 
     this.flowCatalog = new FlowCatalog(config);
@@ -153,7 +154,7 @@ public class GitConfigMonitorTest {
     Collection<Spec> specs = this.flowCatalog.getSpecs();
 
     Assert.assertTrue(specs.size() == 1);
-    FlowSpec spec = (FlowSpec)(specs.iterator().next());
+    FlowSpec spec = (FlowSpec) (specs.iterator().next());
     Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
@@ -176,7 +177,7 @@ public class GitConfigMonitorTest {
     Collection<Spec> specs = this.flowCatalog.getSpecs();
 
     Assert.assertTrue(specs.size() == 1);
-    FlowSpec spec = (FlowSpec)(specs.iterator().next());
+    FlowSpec spec = (FlowSpec) (specs.iterator().next());
     Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
@@ -234,13 +235,13 @@ public class GitConfigMonitorTest {
       }
     });
 
-    FlowSpec spec = (FlowSpec)specList.get(0);
+    FlowSpec spec = (FlowSpec) specList.get(0);
     Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
     Assert.assertEquals(spec.getConfig().getString("param1"), "value1");
 
-    spec = (FlowSpec)specList.get(1);
+    spec = (FlowSpec) specList.get(1);
     Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow2"));
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow2");
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
@@ -279,13 +280,13 @@ public class GitConfigMonitorTest {
       }
     });
 
-    spec = (FlowSpec)specList.get(0);
+    spec = (FlowSpec) specList.get(0);
     Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow2"));
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow2");
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
     Assert.assertEquals(spec.getConfig().getString("param1"), "value4");
 
-    spec = (FlowSpec)specList.get(1);
+    spec = (FlowSpec) specList.get(1);
     Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow3"));
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow3");
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
@@ -325,7 +326,7 @@ public class GitConfigMonitorTest {
     specs = this.flowCatalog.getSpecs();
     Assert.assertTrue(specs.size() == 1);
 
-    FlowSpec spec = (FlowSpec)(specs.iterator().next());
+    FlowSpec spec = (FlowSpec) (specs.iterator().next());
     Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 926dd10..68b030b 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -95,7 +95,7 @@ public class GobblinServiceManagerTest {
     Properties serviceCoreProperties = new Properties();
     serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR);
     serviceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR);
-    serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY , TEST_GOBBLIN_EXECUTOR_NAME);
+    serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY, TEST_GOBBLIN_EXECUTOR_NAME);
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".description",
         "StandaloneTestExecutor");
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".version",
@@ -108,9 +108,9 @@ public class GobblinServiceManagerTest {
         TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
 
     serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, true);
-    serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI, GIT_REMOTE_REPO_DIR);
-    serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, GIT_LOCAL_REPO_DIR);
-    serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, 5);
+    serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI, GIT_REMOTE_REPO_DIR);
+    serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, GIT_LOCAL_REPO_DIR);
+    serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5);
 
     // Create a bare repository
     RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
@@ -200,7 +200,7 @@ public class GobblinServiceManagerTest {
 
     Assert.assertEquals(flowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
     Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
-    Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE );
+    Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE);
     Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
     Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
     // Add this assert back when getFlowSpec() is changed to return the raw flow spec
@@ -228,7 +228,7 @@ public class GobblinServiceManagerTest {
 
     Assert.assertEquals(retrievedFlowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
     Assert.assertEquals(retrievedFlowConfig.getId().getFlowName(), TEST_FLOW_NAME);
-    Assert.assertEquals(retrievedFlowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE );
+    Assert.assertEquals(retrievedFlowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE);
     Assert.assertEquals(retrievedFlowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
     // Add this asssert when getFlowSpec() is changed to return the raw flow spec
     //Assert.assertEquals(flowConfig.getProperties().size(), 2);
@@ -279,7 +279,7 @@ public class GobblinServiceManagerTest {
     specs = this.gobblinServiceManager.flowCatalog.getSpecs();
     Assert.assertTrue(specs.size() == 1);
 
-    FlowSpec spec = (FlowSpec)(specs.iterator().next());
+    FlowSpec spec = (FlowSpec) (specs.iterator().next());
     Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
     Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
index 9738344..04f2270 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
@@ -51,6 +51,10 @@ public class BaseFlowGraphTest {
   private FlowEdge edge3;
   private FlowEdge edge3c;
 
+  private String edgeId1;
+  private String edgeId2;
+  private String edgeId3;
+
   BaseFlowGraph graph;
   @BeforeClass
   public void setUp()
@@ -58,33 +62,40 @@ public class BaseFlowGraphTest {
              IOException, DataNode.DataNodeCreationException {
     Properties properties = new Properties();
     properties.put("key1", "val1");
-    Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("node1"));
+    Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
+        ConfigValueFactory.fromAnyRef("node1"));
     node1 = new BaseDataNode(node1Config);
 
     properties = new Properties();
     properties.put("key2", "val2");
-    Config node2Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("node2"));
+    Config node2Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
+        ConfigValueFactory.fromAnyRef("node2"));
     node2 = new BaseDataNode(node2Config);
 
     properties = new Properties();
     properties.put("key3", "val3");
-    Config node3Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("node3"));
+    Config node3Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
+        ConfigValueFactory.fromAnyRef("node3"));
     node3 = new BaseDataNode(node3Config);
 
     //Create a clone of node3
     node3c = new BaseDataNode(node3Config);
 
-    FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"),"","", ConfigFactory.empty(),null, null, null);
-    FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"),"","", ConfigFactory.empty(),null, null, null);
-    FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"),"","", ConfigFactory.empty(),null, null, null);
+    FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null, null);
+    FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null, null);
+    FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null, null);
 
     //Create edge instances
-    edge1 = new BaseFlowEdge(Lists.newArrayList("node1", "node2"), "edge1", flowTemplate1, null, ConfigFactory.empty(), true);
-    edge2 = new BaseFlowEdge(Lists.newArrayList("node2", "node3"), "edge2", flowTemplate2, null, ConfigFactory.empty(), true);
-    edge3 = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), "edge3", flowTemplate3, null, ConfigFactory.empty(), true);
+    edgeId1 = "node1:node2:edge1";
+    edgeId2 = "node2:node3:edge2";
+    edgeId3 = "node3:node1:edge3";
+
+    edge1 = new BaseFlowEdge(Lists.newArrayList("node1", "node2"), edgeId1, flowTemplate1, null, ConfigFactory.empty(), true);
+    edge2 = new BaseFlowEdge(Lists.newArrayList("node2", "node3"), edgeId2, flowTemplate2, null, ConfigFactory.empty(), true);
+    edge3 = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), edgeId3, flowTemplate3, null, ConfigFactory.empty(), true);
 
     //Create a clone of edge3
-    edge3c = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), "edge3", flowTemplate3, null, ConfigFactory.empty(), true);
+    edge3c = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), edgeId3, flowTemplate3, null, ConfigFactory.empty(), true);
 
     //Create a FlowGraph
     graph = new BaseFlowGraph();
@@ -193,8 +204,7 @@ public class BaseFlowGraphTest {
 
   @Test (dependsOnMethods = "testDeleteDataNode")
   public void testDeleteFlowEdgeById() throws Exception {
-    String edgeLabel1 = BaseFlowEdge.generateEdgeId(Lists.newArrayList("node1", "node2"), "edge1");
-    Assert.assertTrue(graph.deleteFlowEdge(edgeLabel1));
+    Assert.assertTrue(graph.deleteFlowEdge(edgeId1));
     Assert.assertEquals(graph.getEdges("node1").size(), 0);
     Assert.assertEquals(graph.getEdges("node2").size(), 1);
     Assert.assertEquals(graph.getEdges("node3").size(), 1);
@@ -203,8 +213,7 @@ public class BaseFlowGraphTest {
     Assert.assertTrue(graph.getEdges("node2").contains(edge2));
     Assert.assertTrue(graph.getEdges("node3").contains(edge3));
 
-    String edgeLabel2 = BaseFlowEdge.generateEdgeId(Lists.newArrayList("node2", "node3"), "edge2");
-    Assert.assertTrue(graph.deleteFlowEdge(edgeLabel2));
+    Assert.assertTrue(graph.deleteFlowEdge(edgeId2));
     Assert.assertEquals(graph.getEdges("node1").size(), 0);
     Assert.assertEquals(graph.getEdges("node2").size(), 0);
     Assert.assertEquals(graph.getEdges("node3").size(), 1);
@@ -213,8 +222,7 @@ public class BaseFlowGraphTest {
     Assert.assertTrue(!graph.getEdges("node2").contains(edge2));
     Assert.assertTrue(graph.getEdges("node3").contains(edge3));
 
-    String edgeLabel3 = BaseFlowEdge.generateEdgeId(Lists.newArrayList("node3", "node1"), "edge3");
-    Assert.assertTrue(graph.deleteFlowEdge(edgeLabel3));
+    Assert.assertTrue(graph.deleteFlowEdge(edgeId3));
     Assert.assertEquals(graph.getEdges("node1").size(), 0);
     Assert.assertEquals(graph.getEdges("node2").size(), 0);
     Assert.assertEquals(graph.getEdges("node3").size(), 0);
@@ -223,8 +231,8 @@ public class BaseFlowGraphTest {
     Assert.assertTrue(!graph.getEdges("node2").contains(edge2));
     Assert.assertTrue(!graph.getEdges("node3").contains(edge3));
 
-    Assert.assertTrue(!graph.deleteFlowEdge(edgeLabel1));
-    Assert.assertTrue(!graph.deleteFlowEdge(edgeLabel2));
-    Assert.assertTrue(!graph.deleteFlowEdge(edgeLabel3));
+    Assert.assertTrue(!graph.deleteFlowEdge(edgeId1));
+    Assert.assertTrue(!graph.deleteFlowEdge(edgeId2));
+    Assert.assertTrue(!graph.deleteFlowEdge(edgeId3));
   }
 }
\ No newline at end of file