You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/18 01:18:41 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1084] Refresh
flowgraph when templates are modified
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9a5e222 [GOBBLIN-1084] Refresh flowgraph when templates are modified
9a5e222 is described below
commit 9a5e222f87a0a44903e6c2c30aaec365cc579979
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Mar 17 18:18:34 2020 -0700
[GOBBLIN-1084] Refresh flowgraph when templates are modified
Closes #2924 from jack-moseley/refresh-flowgraph
---
.../gobblin/service/modules/core/GitFlowGraphMonitor.java | 6 ++++++
.../gobblin/service/modules/core/GitMonitoringService.java | 2 +-
.../modules/template_catalog/FSFlowTemplateCatalog.java | 4 ++++
.../template_catalog/ObservingFSFlowEdgeTemplateCatalog.java | 10 ++++++++++
.../gobblin/service/modules/core/GitFlowGraphMonitorTest.java | 1 +
5 files changed, 22 insertions(+), 1 deletion(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 44dd01a..9c1d105 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
@@ -127,6 +128,11 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
*/
@Override
void processGitConfigChanges() throws GitAPIException, IOException {
+ if (flowTemplateCatalog.isPresent() && flowTemplateCatalog.get().getAndSetShouldRefreshFlowGraph(false)) {
+ log.info("Change to template catalog detected, refreshing FlowGraph");
+ this.gitRepo.initRepository();
+ }
+
List<DiffEntry> changes = this.gitRepo.getChanges();
Collections.sort(changes, (o1, o2) -> {
Integer o1Depth = (o1.getNewPath() != null) ? (new Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
index d577bdd..d455ad8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -304,7 +304,7 @@ public abstract class GitMonitoringService extends AbstractIdleService {
* @throws GitAPIException
* @throws IOException
*/
- private void initRepository() throws GitAPIException, IOException {
+ void initRepository() throws GitAPIException, IOException {
File repoDirFile = new File(this.repoDir);
try {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
index 24e3ff2..e309026 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
@@ -185,4 +185,8 @@ public class FSFlowTemplateCatalog extends FSJobCatalog implements FlowCatalogWi
return true;
}
+
+ public boolean getAndSetShouldRefreshFlowGraph(boolean value) {
+ return false;
+ }
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
index 9b68024..5c47388 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.hadoop.fs.Path;
@@ -49,6 +50,8 @@ public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog {
private Map<URI, List<JobTemplate>> jobTemplateMap = new ConcurrentHashMap<>();
private ReadWriteLock rwLock;
+ private AtomicBoolean shouldRefreshFlowGraph = new AtomicBoolean(false);
+
public ObservingFSFlowEdgeTemplateCatalog(Config sysConfig, ReadWriteLock rwLock) throws IOException {
super(sysConfig);
this.rwLock = rwLock;
@@ -92,14 +95,21 @@ public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog {
}
}
+ @Override
+ public boolean getAndSetShouldRefreshFlowGraph(boolean value) {
+ return this.shouldRefreshFlowGraph.getAndSet(value);
+ }
+
/**
* Clear cached templates so they will be reloaded next time {@link #getFlowTemplate(URI)} is called.
+ * Also refresh git flow graph in case any edges that failed to be added on startup are successful now.
*/
private void clearTemplates() {
this.rwLock.writeLock().lock();
log.info("Change detected, reloading flow templates.");
flowTemplateMap.clear();
jobTemplateMap.clear();
+ getAndSetShouldRefreshFlowGraph(true);
this.rwLock.writeLock().unlock();
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index c07b56e..e7a6863 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -26,6 +26,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;