You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/18 01:18:41 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1084] Refresh flowgraph when templates are modified

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a5e222  [GOBBLIN-1084] Refresh flowgraph when templates are modified
9a5e222 is described below

commit 9a5e222f87a0a44903e6c2c30aaec365cc579979
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Mar 17 18:18:34 2020 -0700

    [GOBBLIN-1084] Refresh flowgraph when templates are modified
    
    Closes #2924 from jack-moseley/refresh-flowgraph
---
 .../gobblin/service/modules/core/GitFlowGraphMonitor.java      |  6 ++++++
 .../gobblin/service/modules/core/GitMonitoringService.java     |  2 +-
 .../modules/template_catalog/FSFlowTemplateCatalog.java        |  4 ++++
 .../template_catalog/ObservingFSFlowEdgeTemplateCatalog.java   | 10 ++++++++++
 .../gobblin/service/modules/core/GitFlowGraphMonitorTest.java  |  1 +
 5 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 44dd01a..9c1d105 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
@@ -127,6 +128,11 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
    */
   @Override
   void processGitConfigChanges() throws GitAPIException, IOException {
+    if (flowTemplateCatalog.isPresent() && flowTemplateCatalog.get().getAndSetShouldRefreshFlowGraph(false)) {
+      log.info("Change to template catalog detected, refreshing FlowGraph");
+      this.gitRepo.initRepository();
+    }
+
     List<DiffEntry> changes = this.gitRepo.getChanges();
     Collections.sort(changes, (o1, o2) -> {
       Integer o1Depth = (o1.getNewPath() != null) ? (new Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
index d577bdd..d455ad8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -304,7 +304,7 @@ public abstract class GitMonitoringService extends AbstractIdleService {
      * @throws GitAPIException
      * @throws IOException
      */
-    private void initRepository() throws GitAPIException, IOException {
+    void initRepository() throws GitAPIException, IOException {
       File repoDirFile = new File(this.repoDir);
 
       try {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
index 24e3ff2..e309026 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
@@ -185,4 +185,8 @@ public class FSFlowTemplateCatalog extends FSJobCatalog implements FlowCatalogWi
 
     return true;
   }
+
+  public boolean getAndSetShouldRefreshFlowGraph(boolean value) {
+    return false;
+  }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
index 9b68024..5c47388 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 
 import org.apache.hadoop.fs.Path;
@@ -49,6 +50,8 @@ public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog {
   private Map<URI, List<JobTemplate>> jobTemplateMap = new ConcurrentHashMap<>();
   private ReadWriteLock rwLock;
 
+  private AtomicBoolean shouldRefreshFlowGraph = new AtomicBoolean(false);
+
   public ObservingFSFlowEdgeTemplateCatalog(Config sysConfig, ReadWriteLock rwLock) throws IOException {
     super(sysConfig);
     this.rwLock = rwLock;
@@ -92,14 +95,21 @@ public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog {
     }
   }
 
+  @Override
+  public boolean getAndSetShouldRefreshFlowGraph(boolean value) {
+    return this.shouldRefreshFlowGraph.getAndSet(value);
+  }
+
   /**
    * Clear cached templates so they will be reloaded next time {@link #getFlowTemplate(URI)} is called.
+   * Also refresh git flow graph in case any edges that failed to be added on startup are successful now.
    */
   private void clearTemplates() {
     this.rwLock.writeLock().lock();
     log.info("Change detected, reloading flow templates.");
     flowTemplateMap.clear();
     jobTemplateMap.clear();
+    getAndSetShouldRefreshFlowGraph(true);
     this.rwLock.writeLock().unlock();
   }
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index c07b56e..e7a6863 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;