You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/04 18:30:34 UTC
[gobblin] branch master updated: [GOBBLIN-1696] Implement file based flowgraph that detects changes to the underlying… (#3548)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a246e2386 [GOBBLIN-1696] Implement file based flowgraph that detects changes to the underlying… (#3548)
a246e2386 is described below
commit a246e238603bf3adbd6d30ab04242174779cca99
Author: William Lo <lo...@gmail.com>
AuthorDate: Tue Oct 4 11:30:26 2022 -0700
[GOBBLIN-1696] Implement file based flowgraph that detects changes to the underlying… (#3548)
* Implement file based flowgraph that detects changes to the underlying filesystem and updates dynamically
* Fix checkstyle
* Address review, use atomic swap instead of mutable shared state for flowgraph
* Fix initialization to only load flowgraph after topology specstore is loaded and for better clarity on exceptions
* Change baseflowgraphlistener to a helper class so that it is extensible to future changes in flowgraph format, and can be leveraged properly by test classes
* Fix checkstyle
* Address review and add metrics for flow update failures
* Fix tests
* Migrate git flowgraph to load fully on change rather than by diff, address review
* Cleanup
* Fix tests
---
.../gobblin/configuration/ConfigurationKeys.java | 10 +-
.../apache/gobblin/metrics/ServiceMetricNames.java | 2 +
.../service/modules/flow/MultiHopFlowCompiler.java | 42 +--
.../service/modules/flowgraph/BaseFlowGraph.java | 19 +-
...GraphListener.java => BaseFlowGraphHelper.java} | 184 ++++++------
.../FSPathAlterationFlowGraphListener.java | 96 +++++++
.../service/modules/flowgraph/FlowGraph.java | 17 +-
.../modules/flowgraph/FlowGraphMonitor.java | 1 -
.../service/monitoring/FsFlowGraphMonitor.java | 133 +++++++++
.../service/monitoring/GitConfigMonitor.java | 8 +-
.../service/monitoring/GitFlowGraphListener.java | 77 -----
.../service/monitoring/GitFlowGraphMonitor.java | 79 +++---
.../modules/flow/MultiHopFlowCompilerTest.java | 84 +-----
.../service/monitoring/FsFlowGraphMonitorTest.java | 315 +++++++++++++++++++++
.../monitoring/GitFlowGraphMonitorTest.java | 41 +--
.../util/callbacks/CallbacksDispatcher.java | 2 +-
...ionCatchingPathAlterationListenerDecorator.java | 73 +++--
.../util/filesystem/PathAlterationListener.java | 6 +
.../filesystem/PathAlterationListenerAdaptor.java | 3 +
.../util/filesystem/PathAlterationObserver.java | 30 +-
20 files changed, 838 insertions(+), 384 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 00817be63..cb5f664e8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1022,8 +1022,14 @@ public class ConfigurationKeys {
* Configuration properties related to flowGraphs
*/
- public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
- public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
+ public static final String FLOWGRAPH_JAVA_PROPS_EXTENSIONS = "flowGraph.javaPropsExtensions";
+ public static final String FLOWGRAPH_HOCON_FILE_EXTENSIONS = "flowGraph.hoconFileExtensions";
+ public static final String DEFAULT_PROPERTIES_EXTENSIONS = "properties";
+ public static final String DEFAULT_CONF_EXTENSIONS = "conf";
+ public static final String FLOWGRAPH_POLLING_INTERVAL = "flowGraph.pollingInterval";
+ public static final String FLOWGRAPH_BASE_DIR = "flowGraph.configBaseDirectory";
+ public static final String FLOWGRAPH_ABSOLUTE_DIR = "flowGraph.absoluteDirectory";
+
/***
* Configuration properties related to TopologySpec Store
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index eb8ff896c..94e2a82c6 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -54,4 +54,6 @@ public class ServiceMetricNames {
public static final String JOBS_SENT_TO_SPEC_EXECUTOR = "JobsSentToSpecExecutor";
public static final String HELIX_LEADER_STATE = "HelixLeaderState";
+
+ public static final String FLOWGRAPH_UPDATE_FAILED_METER = GOBBLIN_SERVICE_PREFIX + ".FlowgraphUpdateFailed";
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index dc3d0bc47..1ffef3969 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -35,9 +36,10 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -52,7 +54,6 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
@@ -72,6 +73,7 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
/***
@@ -81,8 +83,7 @@ import org.apache.gobblin.util.ConfigUtils;
@Alpha
@Slf4j
public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
- @Getter
- private final FlowGraph flowGraph;
+ private AtomicReference<FlowGraph> flowGraph;
@Getter
private ServiceManager serviceManager;
@Getter
@@ -111,6 +112,12 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
this(config, log, true);
}
+ public MultiHopFlowCompiler(Config config, AtomicReference<FlowGraph> flowGraph) {
+ super(config, Optional.absent(), true);
+ this.flowGraph = flowGraph;
+ this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
+ }
+
public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
try {
@@ -120,8 +127,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
} catch (RuntimeException e) {
MultiHopFlowCompiler.log.warn("Exception reading data node alias map, ignoring it.", e);
}
-
- this.flowGraph = new BaseFlowGraph(dataNodeAliasMap);
+ // Use atomic reference to avoid partial flowgraph upgrades during path compilation.
+ this.flowGraph = new AtomicReference<>(new BaseFlowGraph(dataNodeAliasMap));
Optional<ObservingFSFlowEdgeTemplateCatalog> flowTemplateCatalog = Optional.absent();
if (config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
@@ -152,7 +159,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
try {
String flowGraphMonitorClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY, GitFlowGraphMonitor.class.getCanonicalName());
this.flowGraphMonitor = (FlowGraphMonitor) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(FlowGraphMonitor.class).resolve(
- flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this.flowGraph, this.topologySpecMap, this.getInitComplete());
+ flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), instrumentationEnabled);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
@@ -167,13 +174,6 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
}
}
- @VisibleForTesting
- MultiHopFlowCompiler(Config config, FlowGraph flowGraph) {
- super(config, Optional.absent(), true);
- this.flowGraph = flowGraph;
- this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
- }
-
/**
* Mark the {@link SpecCompiler} as active. This in turn activates the {@link GitFlowGraphMonitor}, allowing to start polling
* and processing changes
@@ -207,20 +207,20 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
public Dag<JobExecutionPlan> compileFlow(Spec spec) {
Preconditions.checkNotNull(spec);
Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowCompiler only accepts FlowSpecs");
-
+ FlowGraph graph = this.flowGraph.get();
long startTime = System.nanoTime();
FlowSpec flowSpec = (FlowSpec) spec;
String source = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, this.dataNodeAliasMap);
String destination = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
- DataNode sourceNode = this.flowGraph.getNode(source);
+ DataNode sourceNode = graph.getNode(source);
if (sourceNode == null) {
flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", source));
return null;
}
List<String> destNodeIds = FlowConfigUtils.getDataNodes(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
- List<DataNode> destNodes = destNodeIds.stream().map(this.flowGraph::getNode).collect(Collectors.toList());
+ List<DataNode> destNodes = destNodeIds.stream().map(graph::getNode).collect(Collectors.toList());
if (destNodes.contains(null)) {
flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", destNodeIds.get(destNodes.indexOf(null))));
return null;
@@ -252,7 +252,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
}
//Compute the path from source to destination.
- FlowGraphPath flowGraphPath = flowGraph.findPath(datasetFlowSpec);
+ FlowGraphPath flowGraphPath = graph.findPath(datasetFlowSpec);
if (flowGraphPath != null) {
//Convert the path into a Dag of JobExecutionPlans.
jobExecutionPlanDag = jobExecutionPlanDag.merge(flowGraphPath.asDag(this.config));
@@ -290,6 +290,10 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
return jobExecutionPlanDag;
}
+ public void setFlowGraph(FlowGraph flowGraph) {
+ this.flowGraph.set(flowGraph);
+ }
+
/**
* If {@link FlowSpec} has {@link ConfigurationKeys#DATASET_SUBPATHS_KEY}, split it into multiple flowSpecs using a
* provided base input and base output path to generate multiple source/destination paths.
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index 9f5ef0292..e54fd7cc0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -45,12 +45,13 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@Alpha
@Slf4j
public class BaseFlowGraph implements FlowGraph {
+ // Synchronize read/write access while the flowgraph is in the middle of an update
private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
- private Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>();
- private Map<String, DataNode> dataNodeMap = new HashMap<>();
- private Map<String, FlowEdge> flowEdgeMap = new HashMap<>();
- private Map<String, String> dataNodeAliasMap;
+ private final Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>();
+ private final Map<String, DataNode> dataNodeMap = new HashMap<>();
+ private final Map<String, FlowEdge> flowEdgeMap = new HashMap<>();
+ private final Map<String, String> dataNodeAliasMap;
public BaseFlowGraph() {
this(new HashMap<>());
@@ -239,15 +240,17 @@ public class BaseFlowGraph implements FlowGraph {
/**{@inheritDoc}**/
@Override
- public FlowGraphPath findPath(FlowSpec flowSpec) throws PathFinder.PathFinderException, ReflectiveOperationException {
+ public FlowGraphPath findPath(FlowSpec flowSpec)
+ throws PathFinder.PathFinderException, ReflectiveOperationException {
try {
rwLock.readLock().lock();
//Instantiate a PathFinder.
- Class pathFinderClass = Class.forName(ConfigUtils
- .getString(flowSpec.getConfig(), FlowGraphConfigurationKeys.FLOW_GRAPH_PATH_FINDER_CLASS,
+ Class pathFinderClass = Class.forName(
+ ConfigUtils.getString(flowSpec.getConfig(), FlowGraphConfigurationKeys.FLOW_GRAPH_PATH_FINDER_CLASS,
FlowGraphConfigurationKeys.DEFAULT_FLOW_GRAPH_PATH_FINDER_CLASS));
PathFinder pathFinder =
- (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec, dataNodeAliasMap);
+ (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec,
+ dataNodeAliasMap);
return pathFinder.findPath();
} finally {
rwLock.readLock().unlock();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
similarity index 63%
rename from gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java
rename to gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
index fa249c4cd..4f0b82779 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.flowgraph;
+import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -28,7 +29,6 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@@ -38,7 +38,13 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
@@ -47,55 +53,60 @@ import org.apache.gobblin.util.PullFileLoader;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-
/**
- * Provides the common set of functionalities needed by listeners of {@link FlowGraphMonitor} to read changes in files and
+ * Provides the common set of functionalities needed by {@link FlowGraphMonitor} to read changes in files and
* apply them to a {@link FlowGraph}
* Assumes that the directory structure between flowgraphs configuration files are the same.
*/
@Slf4j
-public abstract class BaseFlowGraphListener {
- protected FlowGraph flowGraph;
- protected static final int NODE_FILE_DEPTH = 3;
- protected static final int EDGE_FILE_DEPTH = 4;
+public class BaseFlowGraphHelper {
+ public static final int NODE_FILE_DEPTH = 3;
+ public static final int EDGE_FILE_DEPTH = 4;
private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
final String baseDirectory;
private final Config emptyConfig = ConfigFactory.empty();
- private Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
+ private final Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
private final Map<URI, TopologySpec> topologySpecMap;
-
+ private MetricContext metricContext;
final String flowGraphFolderName;
final PullFileLoader pullFileLoader;
final Set<String> javaPropsExtensions;
final Set<String> hoconFileExtensions;
+ private final Optional<ContextAwareMeter> flowGraphUpdateFailedMeter;
- public BaseFlowGraphListener(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
- FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String flowGraphFolderName,
- String javaPropsExtentions, String hoconFileExtensions) {
+ public BaseFlowGraphHelper(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
+ Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String flowGraphFolderName,
+ String javaPropsExtentions, String hoconFileExtensions, boolean instrumentationEnabled, Config config) {
this.flowTemplateCatalog = flowTemplateCatalog;
- this.flowGraph = graph;
this.topologySpecMap = topologySpecMap;
this.baseDirectory = baseDirectory;
this.flowGraphFolderName = flowGraphFolderName;
Path folderPath = new Path(baseDirectory, this.flowGraphFolderName);
this.javaPropsExtensions = Sets.newHashSet(javaPropsExtentions.split(","));
this.hoconFileExtensions = Sets.newHashSet(hoconFileExtensions.split(","));
+ if (instrumentationEnabled) {
+ this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), BaseFlowGraphHelper.class);
+ this.flowGraphUpdateFailedMeter = Optional.of(this.metricContext.contextAwareMeter(ServiceMetricNames.FLOWGRAPH_UPDATE_FAILED_METER));
+ } else {
+ this.flowGraphUpdateFailedMeter = Optional.absent();
+ }
try {
this.pullFileLoader = new PullFileLoader(folderPath,
- FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()),
- this.javaPropsExtensions, this.hoconFileExtensions);
+ FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()), this.javaPropsExtensions,
+ this.hoconFileExtensions);
} catch (IOException e) {
throw new RuntimeException("Could not create pull file loader", e);
}
}
+
/**
* Add a {@link DataNode} to the {@link FlowGraph}. The method uses the {@link FlowGraphConfigurationKeys#DATA_NODE_CLASS} config
* to instantiate a {@link DataNode} from the node config file.
* @param path of node to add
*/
- protected void addDataNode(String path) {
+ protected void addDataNode(FlowGraph graph, String path) {
if (checkFilePath(path, NODE_FILE_DEPTH)) {
Path nodeFilePath = new Path(this.baseDirectory, path);
try {
@@ -103,52 +114,39 @@ public abstract class BaseFlowGraphListener {
Class dataNodeClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.DATA_NODE_CLASS,
FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, config);
- if (!this.flowGraph.addDataNode(dataNode)) {
+ if (!graph.addDataNode(dataNode)) {
log.warn("Could not add DataNode {} to FlowGraph; skipping", dataNode.getId());
} else {
log.info("Added Datanode {} to FlowGraph", dataNode.getId());
}
} catch (Exception e) {
+ if (this.flowGraphUpdateFailedMeter.isPresent()) {
+ this.flowGraphUpdateFailedMeter.get().mark();
+ }
log.warn("Could not add DataNode defined in {} due to exception {}", path, e);
}
}
}
- /**
- * Remove a {@link DataNode} from the {@link FlowGraph}. The method extracts the nodeId of the
- * {@link DataNode} from the node config file and uses it to delete the associated {@link DataNode}.
- * @param path of node to delete
- */
- protected void removeDataNode(String path) {
- if (checkFilePath(path, NODE_FILE_DEPTH)) {
- Path nodeFilePath = new Path(this.baseDirectory, path);
- Config config = getNodeConfigWithOverrides(ConfigFactory.empty(), nodeFilePath);
- String nodeId = config.getString(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY);
- if (!this.flowGraph.deleteDataNode(nodeId)) {
- log.warn("Could not remove DataNode {} from FlowGraph; skipping", nodeId);
- } else {
- log.info("Removed DataNode {} from FlowGraph", nodeId);
- }
- }
- }
-
/**
* Add a {@link FlowEdge} to the {@link FlowGraph}. The method uses the {@link FlowEdgeFactory} instance
* provided by the {@link FlowGraph} to build a {@link FlowEdge} from the edge config file.
* @param path of edge to add
*/
- protected void addFlowEdge(String path) {
+ protected void addFlowEdge(FlowGraph graph, String path) {
if (checkFilePath(path, EDGE_FILE_DEPTH)) {
Path edgeFilePath = new Path(this.baseDirectory, path);
try {
Config edgeConfig = loadEdgeFileWithOverrides(edgeFilePath);
List<SpecExecutor> specExecutors = getSpecExecutors(edgeConfig);
- Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
- FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
- FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, edgeConfig);
+ Class flowEdgeFactoryClass = Class.forName(
+ ConfigUtils.getString(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+ FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
+ FlowEdgeFactory flowEdgeFactory =
+ (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, edgeConfig);
if (flowTemplateCatalog.isPresent()) {
FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowTemplateCatalog.get(), specExecutors);
- if (!this.flowGraph.addFlowEdge(edge)) {
+ if (!graph.addFlowEdge(edge)) {
log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId());
} else {
log.info("Added edge {} to FlowGraph", edge.getId());
@@ -157,37 +155,17 @@ public abstract class BaseFlowGraphListener {
log.warn("Could not add edge defined in {} to FlowGraph as FlowTemplateCatalog is absent", path);
}
} catch (Exception e) {
- log.warn("Could not add edge defined in {} due to exception {}", path, e.getMessage());
- }
- }
- }
-
- /**
- * Remove a {@link FlowEdge} from the {@link FlowGraph}. The method uses {@link FlowEdgeFactory}
- * to construct the edgeId of the {@link FlowEdge} from the config file and uses it to delete the associated
- * {@link FlowEdge}.
- * @param path of edge to delete
- */
- protected void removeFlowEdge(String path) {
- if (checkFilePath(path, EDGE_FILE_DEPTH)) {
- Path edgeFilePath = new Path(this.baseDirectory, path);
- try {
- Config config = getEdgeConfigWithOverrides(ConfigFactory.empty(), edgeFilePath);
- String edgeId = config.getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY);
- if (!this.flowGraph.deleteFlowEdge(edgeId)) {
- log.warn("Could not remove edge {} from FlowGraph; skipping", edgeId);
- } else {
- log.info("Removed edge {} from FlowGraph", edgeId);
+ log.warn("Could not add edge defined in {} due to exception", path, e);
+ if (this.flowGraphUpdateFailedMeter.isPresent()) {
+ this.flowGraphUpdateFailedMeter.get().mark();
}
- } catch (Exception e) {
- log.warn("Could not remove edge defined in {} due to exception {}", edgeFilePath, e.getMessage());
}
}
}
/**
- * check whether the file has the proper naming and hierarchy
- * @param file the relative path from the repo root
+ * check whether the file has the proper naming and hierarchy for nodes and edges
+ * @param file the relative path from the root of the flowgraph
* @return false if the file does not conform
*/
private boolean checkFilePath(String file, int depth) {
@@ -197,10 +175,9 @@ public abstract class BaseFlowGraphListener {
Path filePath = new Path(file);
String fileExtension = Files.getFileExtension(filePath.getName());
- if (filePath.depth() != depth || !checkFileLevelRelativeToRoot(filePath, depth)
- || !(this.javaPropsExtensions.contains(fileExtension) || this.hoconFileExtensions.contains(fileExtension))) {
- log.warn("Changed file does not conform to directory structure and file name format, skipping: "
- + filePath);
+ if (!checkFileLevelRelativeToRoot(filePath, depth) || !(this.javaPropsExtensions.contains(fileExtension)
+ || this.hoconFileExtensions.contains(fileExtension))) {
+ log.warn("Changed file does not conform to directory structure and file name format, skipping: " + filePath);
return false;
}
return true;
@@ -212,7 +189,7 @@ public abstract class BaseFlowGraphListener {
* @param depth expected depth of the file
* @return true if the file conforms to the expected hierarchy
*/
- private boolean checkFileLevelRelativeToRoot(Path filePath, int depth) {
+ public boolean checkFileLevelRelativeToRoot(Path filePath, int depth) {
if (filePath == null) {
return false;
}
@@ -220,10 +197,7 @@ public abstract class BaseFlowGraphListener {
for (int i = 0; i < depth - 1; i++) {
path = path.getParent();
}
- if (!path.getName().equals(flowGraphFolderName)) {
- return false;
- }
- return true;
+ return path.getName().equals(flowGraphFolderName);
}
/**
@@ -243,14 +217,15 @@ public abstract class BaseFlowGraphListener {
* @param edgeFilePath path of the edge file
* @return config with overridden edge properties
*/
- private Config getEdgeConfigWithOverrides(Config edgeConfig, Path edgeFilePath) {
+ protected Config getEdgeConfigWithOverrides(Config edgeConfig, Path edgeFilePath) {
String source = edgeFilePath.getParent().getParent().getName();
String destination = edgeFilePath.getParent().getName();
String edgeName = Files.getNameWithoutExtension(edgeFilePath.getName());
return edgeConfig.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ConfigValueFactory.fromAnyRef(source))
.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ConfigValueFactory.fromAnyRef(destination))
- .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(getEdgeId(source, destination, edgeName)));
+ .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
+ ConfigValueFactory.fromAnyRef(getEdgeId(source, destination, edgeName)));
}
/**
@@ -259,13 +234,18 @@ public abstract class BaseFlowGraphListener {
* @param edgeConfig containing the logical names of SpecExecutors for this edge.
* @return a {@link List<SpecExecutor>}s for this edge.
*/
- private List<SpecExecutor> getSpecExecutors(Config edgeConfig) throws URISyntaxException {
+ private List<SpecExecutor> getSpecExecutors(Config edgeConfig)
+ throws URISyntaxException, IOException {
//Get the logical names of SpecExecutors where the FlowEdge can be executed.
- List<String> specExecutorNames = ConfigUtils.getStringList(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
+ List<String> specExecutorNames =
+ ConfigUtils.getStringList(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
//Load all the SpecExecutor configurations for this FlowEdge from the SpecExecutor Catalog.
List<SpecExecutor> specExecutors = new ArrayList<>(specExecutorNames.size());
- for (String specExecutorName: specExecutorNames) {
+ for (String specExecutorName : specExecutorNames) {
URI specExecutorUri = new URI(specExecutorName);
+ if (!this.topologySpecMap.containsKey(specExecutorUri)) {
+ throw new IOException(String.format("Spec executor %s does not exist in the topologySpecStore.", specExecutorUri));
+ }
specExecutors.add(this.topologySpecMap.get(specExecutorUri).getSpecExecutor());
}
return specExecutors;
@@ -277,23 +257,60 @@ public abstract class BaseFlowGraphListener {
* @return the configuration object
* @throws IOException
*/
- protected Config loadNodeFileWithOverrides(Path filePath) throws IOException {
+ protected Config loadNodeFileWithOverrides(Path filePath)
+ throws IOException {
Config nodeConfig = this.pullFileLoader.loadPullFile(filePath, emptyConfig, false, false);
return getNodeConfigWithOverrides(nodeConfig, filePath);
}
-
/**
* Load the edge file.
* @param filePath path of the edge file relative to the repository root
* @return the configuration object
* @throws IOException
*/
- protected Config loadEdgeFileWithOverrides(Path filePath) throws IOException {
+ protected Config loadEdgeFileWithOverrides(Path filePath)
+ throws IOException {
Config edgeConfig = this.pullFileLoader.loadPullFile(filePath, emptyConfig, false, false);
return getEdgeConfigWithOverrides(edgeConfig, filePath);
}
+ /**
+ * Loads the entire flowgraph from the path configured in {@link org.apache.gobblin.configuration.ConfigurationKeys.FLOWGRAPH_BASE_DIR }
+ * Expects nodes to be in the format of /flowGraphName/nodeA/nodeA.properties
+ * Expects edges to be in the format of /flowGraphName/nodeA/nodeB/edgeAB.properties
+ * The current flowgraph will be swapped atomically with the new flowgraph that is loaded
+ */
+ public FlowGraph generateFlowGraph() {
+ FlowGraph newFlowGraph = new BaseFlowGraph();
+ java.nio.file.Path graphPath = new File(this.baseDirectory).toPath();
+ try {
+ List<Path> edges = new ArrayList<>();
+ // All nodes must be added first before edges, otherwise edges may have a missing source or destination.
+ // Need to convert files to Hadoop Paths to be compatible with FileAlterationListener
+ java.nio.file.Files.walk(graphPath).forEach(fileName -> {
+ if (!java.nio.file.Files.isDirectory(fileName)) {
+ if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), NODE_FILE_DEPTH)) {
+ addDataNode(newFlowGraph, fileName.toString());
+ } else if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), EDGE_FILE_DEPTH)) {
+ edges.add(new Path(fileName.toString()));
+ }
+ }
+ });
+ for (Path edge : edges) {
+ addFlowEdge(newFlowGraph, edge.toString());
+ }
+ return newFlowGraph;
+ } catch (IOException e) {
+ // Log and report error, but do not break or crash the flowgraph so that currently running flows can continue
+ if (this.flowGraphUpdateFailedMeter.isPresent()) {
+ this.flowGraphUpdateFailedMeter.get().mark();
+ }
+ log.error(String.format("Error while populating file based flowgraph at path %s", graphPath), e);
+ return null;
+ }
+ }
+
/**
* Get an edge label from the edge properties
* @param source source data node id
@@ -301,8 +318,7 @@ public abstract class BaseFlowGraphListener {
* @param edgeName simple name of the edge (e.g. file name without extension of the edge file)
* @return a string label identifying the edge
*/
- private String getEdgeId(String source, String destination, String edgeName) {
+ public String getEdgeId(String source, String destination, String edgeName) {
return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(source, destination, edgeName);
}
-
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
new file mode 100644
index 000000000..1700d8805
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+
+
+/**
+ * An implementation of {@link PathAlterationListener} to listen for changes in a directory and apply it to a GaaS FlowGraph
+ * Is invoked by {@link PathAlterationObserver} which would check a folder and perform recursive comparisons on files compared to
+ * their last polled state. On any detected differences in files when a check is done, the {@link FlowGraph} will be updated.
+ *
+ */
+@Slf4j
+public class FSPathAlterationFlowGraphListener implements PathAlterationListener {
+ private final MultiHopFlowCompiler compiler;
+ private final BaseFlowGraphHelper flowGraphHelper;
+
+ public FSPathAlterationFlowGraphListener(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
+ MultiHopFlowCompiler compiler, String baseDirectory, BaseFlowGraphHelper flowGraphHelper) {
+ this.flowGraphHelper = flowGraphHelper;
+ File graphDir = new File(baseDirectory);
+ // Populate the flowgraph with any existing files
+ if (!graphDir.exists()) {
+ throw new RuntimeException(String.format("Flowgraph directory at path %s does not exist!", graphDir));
+ }
+ this.compiler = compiler;
+ }
+
+ @Override
+ public void onStart(final PathAlterationObserver observer) {
+ }
+
+ @Override
+ public void onFileCreate(final Path path) {
+ }
+
+ @Override
+ public void onFileChange(final Path path) {
+ }
+
+ @Override
+ public void onStop(final PathAlterationObserver observer) {
+ }
+
+ @Override
+ public void onDirectoryCreate(final Path directory) {
+ }
+
+ @Override
+ public void onDirectoryChange(final Path directory) {
+ }
+
+ @Override
+ public void onDirectoryDelete(final Path directory) {
+ }
+
+ @Override
+ public void onFileDelete(final Path path) {
+ }
+
+ @Override
+ public void onCheckDetectedChange() {
+ log.info("Detecting change in flowgraph files, reloading flowgraph");
+ FlowGraph newGraph = this.flowGraphHelper.generateFlowGraph();
+ if (newGraph != null) {
+ this.compiler.setFlowGraph(newGraph);
+ }
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
index 47af72677..2f0a81264 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
@@ -39,35 +39,35 @@ public interface FlowGraph {
* @param nodeId {@link DataNode} identifier.
* @return the {@link DataNode} object if the node is present in the {@link FlowGraph}.
*/
- public DataNode getNode(String nodeId);
+ DataNode getNode(String nodeId);
/**
* Add a {@link DataNode} to the {@link FlowGraph}
* @param node {@link DataNode} to be added
* @return true if {@link DataNode} is added to the {@link FlowGraph} successfully.
*/
- public boolean addDataNode(DataNode node);
+ boolean addDataNode(DataNode node);
/**
* Add a {@link FlowEdge} to the {@link FlowGraph}
* @param edge {@link FlowEdge} to be added
* @return true if {@link FlowEdge} is added to the {@link FlowGraph} successfully.
*/
- public boolean addFlowEdge(FlowEdge edge);
+ boolean addFlowEdge(FlowEdge edge);
/**
* Remove a {@link DataNode} and all its incident edges from the {@link FlowGraph}
* @param nodeId identifier of the {@link DataNode} to be removed
* @return true if {@link DataNode} is removed from the {@link FlowGraph} successfully.
*/
- public boolean deleteDataNode(String nodeId);
+ boolean deleteDataNode(String nodeId);
/**
* Remove a {@link FlowEdge} from the {@link FlowGraph}
* @param edgeId label of the edge to be removed
* @return true if edge is removed from the {@link FlowGraph} successfully.
*/
- public boolean deleteFlowEdge(String edgeId);
+ boolean deleteFlowEdge(String edgeId);
/**
* Get a collection of edges adjacent to a {@link DataNode}. Useful for path finding algorithms and graph
@@ -75,14 +75,14 @@ public interface FlowGraph {
* @param nodeId identifier of the {@link DataNode}
* @return a collection of edges adjacent to the {@link DataNode}
*/
- public Collection<FlowEdge> getEdges(String nodeId);
+ Collection<FlowEdge> getEdges(String nodeId);
/**
* Get a collection of edges adjacent to a {@link DataNode}.
* @param node {@link DataNode}
* @return a collection of edges adjacent to the {@link DataNode}
*/
- public Collection<FlowEdge> getEdges(DataNode node);
+ Collection<FlowEdge> getEdges(DataNode node);
/**
* A method that takes a {@link FlowSpec} containing the source and destination {@link DataNode}s, as well as the
@@ -95,5 +95,6 @@ public interface FlowGraph {
* @return an instance of {@link FlowGraphPath} that encapsulates a sequence of {@link org.apache.gobblin.runtime.api.JobSpec}s
* satisfying flowSpec.
*/
- public FlowGraphPath findPath(FlowSpec flowSpec) throws PathFinder.PathFinderException, ReflectiveOperationException;
+ FlowGraphPath findPath(FlowSpec flowSpec)
+ throws PathFinder.PathFinderException, ReflectiveOperationException;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
index 7c39b0157..bdf551172 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
@@ -30,5 +30,4 @@ public interface FlowGraphMonitor extends Service {
* @param value whether GaaS is ready
*/
void setActive(boolean value);
-
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
new file mode 100644
index 000000000..6183e7d66
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
+import org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
+
+
+@Slf4j
+public class FsFlowGraphMonitor extends AbstractIdleService implements FlowGraphMonitor {
+ public static final String FS_FLOWGRAPH_MONITOR_PREFIX = "gobblin.service.fsFlowGraphMonitor";
+ private static final long DEFAULT_FLOWGRAPH_POLLING_INTERVAL = 60;
+ private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_ABSOLUTE_DIR = "/tmp/fsFlowgraph";
+ private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = "gobblin-flowgraph";
+ private volatile boolean isActive = false;
+ private final long pollingInterval;
+ private BaseFlowGraphHelper flowGraphHelper;
+ private final PathAlterationObserverScheduler pathAlterationDetector;
+ private final FSPathAlterationFlowGraphListener listener;
+ private final PathAlterationObserver observer;
+ private final Path flowGraphPath;
+ private final MultiHopFlowCompiler compiler;
+ private final CountDownLatch initComplete;
+ private static final Config DEFAULT_FALLBACK = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR, DEFAULT_FS_FLOWGRAPH_MONITOR_ABSOLUTE_DIR)
+ .put(ConfigurationKeys.FLOWGRAPH_BASE_DIR, DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+ .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, DEFAULT_FLOWGRAPH_POLLING_INTERVAL)
+ .put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS, ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+ .put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS, ConfigurationKeys.DEFAULT_CONF_EXTENSIONS).build());
+
+ public FsFlowGraphMonitor(Config config, Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
+ MultiHopFlowCompiler compiler, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete, boolean instrumentationEnabled)
+ throws IOException {
+ Config configWithFallbacks = config.getConfig(FS_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+ this.pollingInterval =
+ TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
+ this.flowGraphPath = new Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR));
+ this.observer = new PathAlterationObserver(flowGraphPath);
+ this.flowGraphHelper = new BaseFlowGraphHelper(flowTemplateCatalog, topologySpecMap, flowGraphPath.toString(),
+ configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS),
+ configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS), instrumentationEnabled, config);
+ this.listener = new FSPathAlterationFlowGraphListener(flowTemplateCatalog, compiler, flowGraphPath.toString(), this.flowGraphHelper);
+
+ this.compiler = compiler;
+ this.initComplete = initComplete;
+
+ if (pollingInterval == ConfigurationKeys.DISABLED_JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL) {
+ this.pathAlterationDetector = null;
+ } else {
+ this.pathAlterationDetector = new PathAlterationObserverScheduler(pollingInterval);
+ Optional<PathAlterationObserver> observerOptional = Optional.fromNullable(observer);
+ this.pathAlterationDetector.addPathAlterationObserver(this.listener, observerOptional,
+ this.flowGraphPath);
+ }
+ }
+
+ @Override
+ protected void startUp()
+ throws IOException {
+ }
+
+ @Override
+ public synchronized void setActive(boolean isActive) {
+ log.info("Setting the flow graph monitor to be " + isActive + " from " + this.isActive);
+ if (this.isActive == isActive) {
+ // No-op if already in correct state
+ return;
+ } else if (isActive) {
+ if (this.pathAlterationDetector != null) {
+ log.info("Starting the " + getClass().getSimpleName());
+ log.info("Polling flowgraph folder with interval {} ", this.pollingInterval);
+ try {
+ this.pathAlterationDetector.start();
+ // Manually instantiate flowgraph when the monitor becomes active
+ this.compiler.setFlowGraph(this.flowGraphHelper.generateFlowGraph());
+ // Reduce the countdown latch
+ this.initComplete.countDown();
+ log.info("Finished populating FSFlowgraph");
+ } catch (IOException e) {
+ log.error("Could not initialize pathAlterationDetector due to error: ", e);
+ }
+ } else {
+ log.warn("No path alteration detector found");
+ }
+ }
+ this.isActive = isActive;
+ }
+
+ /** Stop the service. */
+ @Override
+ protected void shutDown()
+ throws Exception {
+ this.pathAlterationDetector.stop();
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
index e7f639ce8..0d47e536d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
@@ -54,8 +54,8 @@ public class GitConfigMonitor extends GitMonitoringService {
.put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR)
.put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME)
.put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL)
- .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
- .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
+ .put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
+ .put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
.build());
private final FlowCatalog flowCatalog;
@@ -66,8 +66,8 @@ public class GitConfigMonitor extends GitMonitoringService {
this.flowCatalog = flowCatalog;
Config configWithFallbacks = config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
this.listeners.add(new GitConfigListener(flowCatalog, configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR),
- configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.JAVA_PROPS_EXTENSIONS),
- configWithFallbacks.getString(ConfigurationKeys.HOCON_FILE_EXTENSIONS)));
+ configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS),
+ configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS)));
}
@Override
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java
deleted file mode 100644
index 1f5a16e7d..000000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.monitoring;
-
-import java.net.URI;
-import java.util.Map;
-
-import org.eclipse.jgit.diff.DiffEntry;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Optional;
-
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphListener;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
-
-
-/**
- * Listener for {@link GitFlowGraphMonitor} to apply changes from Git to a {@link FlowGraph}
- */
-public class GitFlowGraphListener extends BaseFlowGraphListener implements GitDiffListener {
-
- public GitFlowGraphListener(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
- FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String folderName, String javaPropsExtentions,
- String hoconFileExtentions) {
- super(flowTemplateCatalog, graph, topologySpecMap, baseDirectory, folderName, javaPropsExtentions, hoconFileExtentions);
-
- }
-
- /**
- * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
- * the {@link FlowGraph} for an added, updated or modified node or edge file.
- * @param change
- */
- @Override
- public void addChange(DiffEntry change) {
- Path path = new Path(change.getNewPath());
- if (path.depth() == NODE_FILE_DEPTH) {
- addDataNode(change.getNewPath());
- } else if (path.depth() == EDGE_FILE_DEPTH) {
- addFlowEdge(change.getNewPath());
- }
- }
-
- /**
- * Remove an element (i.e. either a {@link DataNode} or a {@link FlowEdge} from the {@link FlowGraph} for
- * a renamed or deleted {@link DataNode} or {@link FlowEdge} file.
- * @param change
- */
- @Override
- public void removeChange(DiffEntry change) {
- Path path = new Path(change.getOldPath());
- if (path.depth() == NODE_FILE_DEPTH) {
- removeDataNode(change.getOldPath());
- } else if (path.depth() == EDGE_FILE_DEPTH) {
- removeFlowEdge(change.getOldPath());
- }
- }
-}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
index bfe811fa0..684ab2c90 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
@@ -18,14 +18,11 @@
package org.apache.gobblin.service.monitoring;
import java.io.IOException;
-import java.io.Serializable;
import java.net.URI;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import org.apache.hadoop.fs.Path;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.diff.DiffEntry;
@@ -33,16 +30,20 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+
/**
* Service that monitors for changes to {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph} from a git repository.
* The git repository must have an inital commit that has no files since that is used as a base for getting
@@ -54,8 +55,6 @@ import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog
@Slf4j
public class GitFlowGraphMonitor extends GitMonitoringService implements FlowGraphMonitor {
public static final String GIT_FLOWGRAPH_MONITOR_PREFIX = "gobblin.service.gitFlowGraphMonitor";
- private static final String PROPERTIES_EXTENSIONS = "properties";
- private static final String CONF_EXTENSIONS = "conf";
private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR = "git-flowgraph";
private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = "gobblin-flowgraph";
private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME = "master";
@@ -67,26 +66,26 @@ public class GitFlowGraphMonitor extends GitMonitoringService implements FlowGra
.put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
.put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
.put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
- .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
- .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
- .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
- .put(SHOULD_CHECKPOINT_HASHES, false)
- .build());
+ .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
+ .put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS, ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+ .put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS, ConfigurationKeys.DEFAULT_CONF_EXTENSIONS)
+ .put(SHOULD_CHECKPOINT_HASHES, false).build());
- private Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
+ private final Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
private final CountDownLatch initComplete;
+ private final BaseFlowGraphHelper flowGraphHelper;
+ private final MultiHopFlowCompiler multihopFlowCompiler;
- public GitFlowGraphMonitor(Config config, Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
- FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) {
+ public GitFlowGraphMonitor(Config config, Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog, MultiHopFlowCompiler compiler
+ , Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete, boolean instrumentationEnabled) {
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
Config configWithFallbacks = config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
this.flowTemplateCatalog = flowTemplateCatalog;
this.initComplete = initComplete;
- this.listeners.add(new GitFlowGraphListener(
- flowTemplateCatalog, graph, topologySpecMap, configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR),
- configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.JAVA_PROPS_EXTENSIONS),
- configWithFallbacks.getString(ConfigurationKeys.HOCON_FILE_EXTENSIONS))
- );
+ this.flowGraphHelper = new BaseFlowGraphHelper(flowTemplateCatalog, topologySpecMap, configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR),
+ configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS),
+ configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS), instrumentationEnabled, config);
+ this.multihopFlowCompiler = compiler;
}
/**
@@ -99,39 +98,27 @@ public class GitFlowGraphMonitor extends GitMonitoringService implements FlowGra
}
/**
- * Sort the changes in a commit so that changes to node files appear before changes to edge files. This is done so that
- * node related changes are applied to the FlowGraph before edge related changes. An example where the order matters
- * is the case when a commit adds a new node n2 as well as adds an edge from an existing node n1 to n2. To ensure that the
- * addition of edge n1->n2 is successful, node n2 must exist in the graph and so needs to be added first. For deletions,
- * the order does not matter and ordering the changes in the commit will result in the same FlowGraph state as if the changes
- * were unordered. In other words, deletion of a node deletes all its incident edges from the FlowGraph. So processing an
- * edge deletion later results in a no-op. Note that node and edge files do not change depth in case of modifications.
- *
- * If there are multiple commits between successive polls to Git, the re-ordering of changes across commits should not
- * affect the final state of the FlowGraph. This is because, the order of changes for a given file type (i.e. node or edge)
- * is preserved.
+ * Reprocesses the entire flowgraph from the root folder every time a change in git is detected
*/
@Override
- void processGitConfigChanges() throws GitAPIException, IOException {
+ void processGitConfigChanges()
+ throws GitAPIException, IOException {
+ // Pulls repository to latest and grabs changes
+ List<DiffEntry> changes = this.gitRepo.getChanges();
if (flowTemplateCatalog.isPresent() && flowTemplateCatalog.get().getAndSetShouldRefreshFlowGraph(false)) {
log.info("Change to template catalog detected, refreshing FlowGraph");
this.gitRepo.initRepository();
+ } else if (changes.isEmpty()) {
+ return;
}
+ log.info("Detected changes in flowGraph, refreshing Flowgraph");
- List<DiffEntry> changes = this.gitRepo.getChanges();
- changes.sort(new GitFlowgraphComparator());
- processGitConfigChangesHelper(changes);
- //Decrements the latch count. The countdown latch is initialized to 1. So after the first time the latch is decremented,
- // the following operation should be a no-op.
- this.initComplete.countDown();
- }
-
- static class GitFlowgraphComparator implements Comparator<DiffEntry>, Serializable {
- public int compare(DiffEntry o1, DiffEntry o2) {
- Integer o1Depth = (o1.getNewPath() != null) ? (new Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth();
- Integer o2Depth = (o2.getNewPath() != null) ? (new Path(o2.getNewPath())).depth() : (new Path(o2.getOldPath())).depth();
- return o1Depth.compareTo(o2Depth);
+ FlowGraph newGraph = this.flowGraphHelper.generateFlowGraph();
+ if (newGraph != null) {
+ this.multihopFlowCompiler.setFlowGraph(newGraph);
}
+ // Noop if flowgraph is already initialized
+ this.initComplete.countDown();
+ this.gitRepo.moveCheckpointAndHashesForward();
}
-
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 0f5a66753..4c3d4f137 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -33,7 +33,9 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.conf.Configuration;
@@ -41,12 +43,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.eclipse.jgit.api.Git;
-import org.eclipse.jgit.api.errors.GitAPIException;
-import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.lib.RepositoryCache;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.util.FS;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -54,7 +50,6 @@ import org.testng.annotations.Test;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.typesafe.config.Config;
@@ -64,7 +59,6 @@ import com.typesafe.config.ConfigSyntax;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
import org.apache.gobblin.runtime.api.FlowSpec;
@@ -75,7 +69,6 @@ import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
@@ -95,7 +88,7 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@Slf4j
public class MultiHopFlowCompilerTest {
- private FlowGraph flowGraph;
+ private AtomicReference<FlowGraph> flowGraph;
private MultiHopFlowCompiler specCompiler;
private final String TESTDIR = "/tmp/mhCompiler/gitFlowGraphTestDir";
@@ -103,7 +96,7 @@ public class MultiHopFlowCompilerTest {
public void setUp()
throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException {
//Create a FlowGraph
- this.flowGraph = new BaseFlowGraph();
+ this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
//Add DataNodes to the graph from the node properties files
URI dataNodesUri = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI();
@@ -119,7 +112,7 @@ public class MultiHopFlowCompilerTest {
Class dataNodeClass = Class.forName(ConfigUtils
.getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
- this.flowGraph.addDataNode(dataNode);
+ this.flowGraph.get().addDataNode(dataNode);
}
}
@@ -153,7 +146,7 @@ public class MultiHopFlowCompilerTest {
specExecutors.add(topologySpecMap.get(new URI(specExecutorName)).getSpecExecutor());
}
FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog, specExecutors);
- this.flowGraph.addFlowEdge(edge);
+ this.flowGraph.get().addFlowEdge(edge);
}
}
this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph);
@@ -400,7 +393,7 @@ public class MultiHopFlowCompilerTest {
@Test (dependsOnMethods = "testCompileFlowWithRetention")
public void testCompileFlowAfterFirstEdgeDeletion() throws URISyntaxException, IOException {
//Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
- this.flowGraph.deleteFlowEdge("HDFS-1_HDFS-1_hdfsConvertToJsonAndEncrypt");
+ this.flowGraph.get().deleteFlowEdge("HDFS-1_HDFS-1_hdfsConvertToJsonAndEncrypt");
FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", false, false);
Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
@@ -522,7 +515,7 @@ public class MultiHopFlowCompilerTest {
@Test (dependsOnMethods = "testCompileFlowAfterFirstEdgeDeletion")
public void testCompileFlowAfterSecondEdgeDeletion() throws URISyntaxException, IOException {
//Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
- this.flowGraph.deleteFlowEdge("HDFS-2_HDFS-2_hdfsConvertToJsonAndEncrypt");
+ this.flowGraph.get().deleteFlowEdge("HDFS-2_HDFS-2_hdfsConvertToJsonAndEncrypt");
FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", false, false);
Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
@@ -674,67 +667,6 @@ public class MultiHopFlowCompilerTest {
spec.getCompilationErrors().stream().anyMatch(s -> s.errorMessage.contains("Flowgraph does not have a node with id"));
}
- @Test (dependsOnMethods = "testMissingDestinationNodeError")
- public void testGitFlowGraphMonitorService()
- throws IOException, GitAPIException, URISyntaxException, InterruptedException {
- File remoteDir = new File(TESTDIR + "/remote");
- File cloneDir = new File(TESTDIR + "/clone");
- File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
-
- //Clean up
- cleanUpDir(TESTDIR);
-
- // Create a bare repository
- RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED);
- Repository remoteRepo = fileKey.open(false);
- remoteRepo.create(true);
-
- Git gitForPush = Git.cloneRepository().setURI(remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
-
- // push an empty commit as a base for detecting changes
- gitForPush.commit().setMessage("First commit").call();
- RefSpec masterRefSpec = new RefSpec("master");
- gitForPush.push().setRemote("origin").setRefSpecs(masterRefSpec).call();
-
- URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-
- Config config = ConfigBuilder.create()
- .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
- + ConfigurationKeys.GIT_MONITOR_REPO_URI, remoteRepo.getDirectory().getAbsolutePath())
- .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TESTDIR + "/git-flowgraph")
- .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
- .addPrimitive(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString())
- .build();
-
- //Create a MultiHopFlowCompiler instance
- specCompiler = new MultiHopFlowCompiler(config, Optional.absent(), false);
-
- specCompiler.setActive(true);
-
- //Ensure node1 is not present in the graph
- Assert.assertNull(specCompiler.getFlowGraph().getNode("node1"));
-
- // push a new node file
- File nodeDir = new File(flowGraphDir, "node1");
- File nodeFile = new File(nodeDir, "node1.properties");
- nodeDir.mkdirs();
- nodeFile.createNewFile();
- Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=val1" + "\n", nodeFile, Charsets.UTF_8);
-
- // add, commit, push node
- gitForPush.add().addFilepattern(formNodeFilePath(flowGraphDir, nodeDir.getName(), nodeFile.getName())).call();
- gitForPush.commit().setMessage("Node commit").call();
- gitForPush.push().setRemote("origin").setRefSpecs(masterRefSpec).call();
-
- // polling is every 5 seconds, so wait twice as long and check
- TimeUnit.SECONDS.sleep(10);
-
- //Test that a DataNode is added to FlowGraph
- DataNode dataNode = specCompiler.getFlowGraph().getNode("node1");
- Assert.assertEquals(dataNode.getId(), "node1");
- Assert.assertEquals(dataNode.getRawConfig().getString("param1"), "val1");
- }
-
private String formNodeFilePath(File flowGraphDir, String groupDir, String fileName) {
return flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
new file mode 100644
index 000000000..0c0799c23
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.transport.RefSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class FsFlowGraphMonitorTest {
+ private static final Logger logger = LoggerFactory.getLogger(FsFlowGraphMonitorTest.class);
+ private final File TEST_DIR = new File(FileUtils.getTempDirectory(), "fsFlowGraphTestDir");
+ private final File flowGraphDir = new File(TEST_DIR, "/gobblin-flowgraph");
+ private static final String NODE_1_FILE = "node1.properties";
+ private final File node1Dir = new File(FileUtils.getTempDirectory(), "node1");
+ private final File node1File = new File(node1Dir, NODE_1_FILE);
+ private static final String NODE_2_FILE = "node2.properties";
+ private final File node2Dir = new File(FileUtils.getTempDirectory(), "node2");
+ private final File node2File = new File(node2Dir, NODE_2_FILE);
+ private final File edge1Dir = new File(node1Dir, "node2");
+ private final File edge1File = new File(edge1Dir, "edge1.properties");
+
+ private RefSpec masterRefSpec = new RefSpec("master");
+ private Optional<FSFlowTemplateCatalog> flowCatalog;
+ private Config config;
+ private AtomicReference<FlowGraph> flowGraph;
+ private FsFlowGraphMonitor flowGraphMonitor;
+ private Map<URI, TopologySpec> topologySpecMap;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ cleanUpDir(TEST_DIR.toString());
+
+
+ URI topologyCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
+ this.topologySpecMap = MultiHopFlowCompilerTest.buildTopologySpecMap(topologyCatalogUri);
+
+ this.config = ConfigBuilder.create()
+ .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "."
+ + ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR, this.flowGraphDir.getAbsolutePath())
+ .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.FLOWGRAPH_BASE_DIR, "gobblin-flowgraph")
+ .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 1)
+ .build();
+
+ // Create a FSFlowTemplateCatalog instance
+ URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ Properties properties = new Properties();
+ this.flowGraphDir.mkdirs();
+ properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(properties);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ this.flowCatalog = Optional.of(new FSFlowTemplateCatalog(templateCatalogCfg));
+
+ //Create a FlowGraph instance with defaults
+ this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
+ MultiHopFlowCompiler mhfc = new MultiHopFlowCompiler(config, this.flowGraph);
+ this.flowGraphMonitor = new FsFlowGraphMonitor(this.config, this.flowCatalog, mhfc, topologySpecMap, new CountDownLatch(1), true);
+ this.flowGraphMonitor.startUp();
+ this.flowGraphMonitor.setActive(true);
+ }
+
+ @Test
+ public void testAddNode() throws Exception {
+ String file1Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=value1\n";
+ String file2Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam2=value2\n";
+
+ addNode(this.node1Dir, this.node1File, file1Contents);
+ addNode(this.node2Dir, this.node2File, file2Contents);
+
+ // Let the monitor pick up the nodes that were recently added
+ Thread.sleep(3000);
+ for (int i = 0; i < 2; i++) {
+ String nodeId = "node" + (i + 1);
+ String paramKey = "param" + (i + 1);
+ String paramValue = "value" + (i + 1);
+ //Check if nodes have been added to the FlowGraph
+ DataNode dataNode = this.flowGraph.get().getNode(nodeId);
+ Assert.assertEquals(dataNode.getId(), nodeId);
+ Assert.assertTrue(dataNode.isActive());
+ Assert.assertEquals(dataNode.getRawConfig().getString(paramKey), paramValue);
+ }
+ }
+
+ @Test (dependsOnMethods = "testAddNode")
+ public void testAddEdge() throws Exception {
+ //Build contents of edge file
+ String fileContents = buildEdgeFileContents("node1", "node2", "edge1", "value1");
+ addEdge(this.edge1Dir, this.edge1File, fileContents);
+ // Let the monitor pick up the edges that were recently added
+ Thread.sleep(3000);
+
+ //Check if edge1 has been added to the FlowGraph
+ testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value1");
+ }
+
+ @Test (dependsOnMethods = "testAddEdge")
+ public void testUpdateEdge() throws Exception {
+ //Update edge1 file
+ String fileContents = buildEdgeFileContents("node1", "node2", "edge1", "value2");
+ addEdge(this.edge1Dir, this.edge1File, fileContents);
+ // Let the monitor pick up the edges that were recently added
+ Thread.sleep(3000);
+
+ //Check if new edge1 has been added to the FlowGraph
+ testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value2");
+ }
+
+ @Test (dependsOnMethods = "testUpdateEdge")
+ public void testUpdateNode() throws Exception {
+ //Update param1 value in node1 and check if updated node is added to the graph
+ String fileContents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=value3\n";
+ addNode(this.node1Dir, this.node1File, fileContents);
+ // Let the monitor pick up the edges that were recently added
+ Thread.sleep(3000);
+ //Check if node has been updated in the FlowGraph
+ DataNode dataNode = this.flowGraph.get().getNode("node1");
+ Assert.assertEquals(dataNode.getId(), "node1");
+ Assert.assertTrue(dataNode.isActive());
+ Assert.assertEquals(dataNode.getRawConfig().getString("param1"), "value3");
+ }
+
+ @Test (dependsOnMethods = "testUpdateNode")
+ public void testSetUpExistingGraph() throws Exception {
+ // Create a FlowGraph instance with defaults
+ this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
+ MultiHopFlowCompiler mhfc = new MultiHopFlowCompiler(config, this.flowGraph);
+
+ this.flowGraphMonitor = new FsFlowGraphMonitor(this.config, this.flowCatalog, mhfc, this.topologySpecMap, new CountDownLatch(1), true);
+ this.flowGraphMonitor.startUp();
+ this.flowGraphMonitor.setActive(true);
+
+ // Let the monitor repopulate the flowgraph
+ Thread.sleep(3000);
+
+ Assert.assertNotNull(this.flowGraph.get().getNode("node1"));
+ Assert.assertNotNull(this.flowGraph.get().getNode("node2"));
+ Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 1);
+
+ }
+
+ @Test (dependsOnMethods = "testSetUpExistingGraph")
+ public void testRemoveEdge() throws Exception {
+ //Node1 has 1 edge before delete
+ Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges("node1");
+ Assert.assertEquals(edgeSet.size(), 1);
+ File edgeFile = new File(this.flowGraphDir.getAbsolutePath(), node1Dir.getName() + Path.SEPARATOR_CHAR + edge1Dir.getName() + Path.SEPARATOR_CHAR + edge1File.getName());
+
+ edgeFile.delete();
+ // Let the monitor pick up the edges that were recently deleted
+ Thread.sleep(3000);
+
+ //Check if edge1 has been deleted from the graph
+ edgeSet = this.flowGraph.get().getEdges("node1");
+ Assert.assertEquals(edgeSet.size(), 0);
+ }
+
+ @Test (dependsOnMethods = "testRemoveEdge")
+ public void testRemoveNode() throws Exception {
+ //Ensure node1 and node2 are present in the graph before delete
+ DataNode node1 = this.flowGraph.get().getNode("node1");
+ Assert.assertNotNull(node1);
+ DataNode node2 = this.flowGraph.get().getNode("node2");
+ Assert.assertNotNull(node2);
+
+
+ File node1FlowGraphFile = new File(this.flowGraphDir.getAbsolutePath(), node1Dir.getName());
+ File node2FlowGraphFile = new File(this.flowGraphDir.getAbsolutePath(), node2Dir.getName());
+ //delete node files
+ FileUtils.deleteDirectory(node1FlowGraphFile);
+ FileUtils.deleteDirectory(node2FlowGraphFile);
+
+ // Let the monitor pick up the edges that were recently deleted
+ Thread.sleep(3000);
+
+ //Check if node1 and node 2 have been deleted from the graph
+ node1 = this.flowGraph.get().getNode("node1");
+ Assert.assertNull(node1);
+ node2 = this.flowGraph.get().getNode("node2");
+ Assert.assertNull(node2);
+ }
+
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ cleanUpDir(TEST_DIR.toString());
+ }
+
+ private void createNewFile(File dir, File file, String fileContents) throws IOException {
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ file.createNewFile();
+ Files.write(fileContents, file, Charsets.UTF_8);
+ }
+
+ private void addNode(File nodeDir, File nodeFile, String fileContents) throws IOException {
+ createNewFile(nodeDir, nodeFile, fileContents);
+ File destinationFile = new File(this.flowGraphDir.getAbsolutePath(), nodeDir.getName() + Path.SEPARATOR_CHAR + nodeFile.getName());
+ logger.info(destinationFile.toString());
+ if (destinationFile.exists()) {
+ // clear file
+ Files.write(new byte[0], destinationFile);
+ Files.write(fileContents, destinationFile, Charsets.UTF_8);
+ } else {
+ FileUtils.moveDirectory(nodeDir, destinationFile.getParentFile());
+ }
+ }
+
+ private void addEdge(File edgeDir, File edgeFile, String fileContents) throws Exception {
+ createNewFile(edgeDir, edgeFile, fileContents);
+ File destinationFile = new File(this.flowGraphDir.getAbsolutePath(), edgeDir.getParentFile().getName() + Path.SEPARATOR_CHAR + edgeDir.getName() + Path.SEPARATOR_CHAR + edgeFile.getName());
+ if (destinationFile.exists()) {
+ // clear old properties file
+ Files.write(new byte[0], destinationFile);
+ Files.write(fileContents, destinationFile, Charsets.UTF_8);
+ } else {
+ FileUtils.moveDirectory(edgeDir, destinationFile.getParentFile());
+ }
+ }
+
+ private String buildEdgeFileContents(String node1, String node2, String edgeName, String value) {
+ String fileContents = FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=" + node1 + "\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=" + node2 + "\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=" + edgeName + "\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "=testExecutor1,testExecutor2\n"
+ + "key1=" + value + "\n";
+ return fileContents;
+ }
+
+ private void testIfEdgeSuccessfullyAdded(String node1, String node2, String edgeName, String value) throws ExecutionException, InterruptedException {
+ Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges(node1);
+ Assert.assertEquals(edgeSet.size(), 1);
+ FlowEdge flowEdge = edgeSet.iterator().next();
+ Assert.assertEquals(flowEdge.getId(), Joiner.on("_").join(node1, node2, edgeName));
+ Assert.assertEquals(flowEdge.getSrc(), node1);
+ Assert.assertEquals(flowEdge.getDest(), node2);
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
+ Assert.assertEquals(flowEdge.getConfig().getString("key1"), value);
+ }
+
+ private void cleanUpDir(String dir) {
+ File specStoreDir = new File(dir);
+
+ // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
+ for (int i = 0; i < 5; i++) {
+ try {
+ if (specStoreDir.exists()) {
+ FileUtils.deleteDirectory(specStoreDir);
+ }
+ // if delete succeeded then break out of loop
+ break;
+ } catch (IOException e) {
+ logger.warn("Cleanup delete directory failed for directory: " + dir, e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
index efa34e1ec..e6cd3354b 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
@@ -21,14 +21,18 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Collection;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
+
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.dircache.DirCache;
@@ -83,7 +87,7 @@ public class GitFlowGraphMonitorTest {
private RefSpec masterRefSpec = new RefSpec("master");
private Optional<FSFlowTemplateCatalog> flowCatalog;
private Config config;
- private BaseFlowGraph flowGraph;
+ private AtomicReference<FlowGraph> flowGraph;
private GitFlowGraphMonitor gitFlowGraphMonitor;
@BeforeClass
@@ -120,11 +124,10 @@ public class GitFlowGraphMonitorTest {
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
this.flowCatalog = Optional.of(new FSFlowTemplateCatalog(templateCatalogCfg));
+ this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
+ MultiHopFlowCompiler mhfc = new MultiHopFlowCompiler(config, this.flowGraph);
- //Create a FlowGraph instance with defaults
- this.flowGraph = new BaseFlowGraph();
-
- this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap, new CountDownLatch(1));
+ this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, mhfc, topologySpecMap, new CountDownLatch(1), true);
this.gitFlowGraphMonitor.setActive(true);
}
@@ -143,7 +146,7 @@ public class GitFlowGraphMonitorTest {
String paramKey = "param" + (i + 1);
String paramValue = "value" + (i + 1);
//Check if nodes have been added to the FlowGraph
- DataNode dataNode = this.flowGraph.getNode(nodeId);
+ DataNode dataNode = this.flowGraph.get().getNode(nodeId);
Assert.assertEquals(dataNode.getId(), nodeId);
Assert.assertTrue(dataNode.isActive());
Assert.assertEquals(dataNode.getRawConfig().getString(paramKey), paramValue);
@@ -191,7 +194,7 @@ public class GitFlowGraphMonitorTest {
this.gitFlowGraphMonitor.processGitConfigChanges();
//Check if node has been updated in the FlowGraph
- DataNode dataNode = this.flowGraph.getNode("node1");
+ DataNode dataNode = this.flowGraph.get().getNode("node1");
Assert.assertEquals(dataNode.getId(), "node1");
Assert.assertTrue(dataNode.isActive());
Assert.assertEquals(dataNode.getRawConfig().getString("param1"), "value3");
@@ -204,7 +207,7 @@ public class GitFlowGraphMonitorTest {
edge1File.delete();
//Node1 has 1 edge before delete
- Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+ Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges("node1");
Assert.assertEquals(edgeSet.size(), 1);
// delete, commit, push
@@ -216,7 +219,7 @@ public class GitFlowGraphMonitorTest {
this.gitFlowGraphMonitor.processGitConfigChanges();
//Check if edge1 has been deleted from the graph
- edgeSet = this.flowGraph.getEdges("node1");
+ edgeSet = this.flowGraph.get().getEdges("node1");
Assert.assertTrue(edgeSet.size() == 0);
}
@@ -227,9 +230,9 @@ public class GitFlowGraphMonitorTest {
node2File.delete();
//Ensure node1 and node2 are present in the graph before delete
- DataNode node1 = this.flowGraph.getNode("node1");
+ DataNode node1 = this.flowGraph.get().getNode("node1");
Assert.assertNotNull(node1);
- DataNode node2 = this.flowGraph.getNode("node2");
+ DataNode node2 = this.flowGraph.get().getNode("node2");
Assert.assertNotNull(node2);
// delete, commit, push
@@ -241,9 +244,9 @@ public class GitFlowGraphMonitorTest {
this.gitFlowGraphMonitor.processGitConfigChanges();
//Check if node1 and node 2 have been deleted from the graph
- node1 = this.flowGraph.getNode("node1");
+ node1 = this.flowGraph.get().getNode("node1");
Assert.assertNull(node1);
- node2 = this.flowGraph.getNode("node2");
+ node2 = this.flowGraph.get().getNode("node2");
Assert.assertNull(node2);
}
@@ -270,9 +273,9 @@ public class GitFlowGraphMonitorTest {
this.gitFlowGraphMonitor.processGitConfigChanges();
//Ensure node1 and node2 are present in the graph
- DataNode node1 = this.flowGraph.getNode("node1");
+ DataNode node1 = this.flowGraph.get().getNode("node1");
Assert.assertNotNull(node1);
- DataNode node2 = this.flowGraph.getNode("node2");
+ DataNode node2 = this.flowGraph.get().getNode("node2");
Assert.assertNotNull(node2);
testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value1");
@@ -293,9 +296,9 @@ public class GitFlowGraphMonitorTest {
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
this.gitFlowGraphMonitor.processGitConfigChanges();
- node1 = this.flowGraph.getNode("node1");
+ node1 = this.flowGraph.get().getNode("node1");
Assert.assertNotNull(node1);
- Assert.assertEquals(this.flowGraph.getEdges(node1).size(), 0);
+ Assert.assertEquals(this.flowGraph.get().getEdges(node1).size(), 0);
}
@AfterClass
@@ -339,7 +342,7 @@ public class GitFlowGraphMonitorTest {
}
private void testIfEdgeSuccessfullyAdded(String node1, String node2, String edgeName, String value) throws ExecutionException, InterruptedException {
- Set<FlowEdge> edgeSet = this.flowGraph.getEdges(node1);
+ Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges(node1);
Assert.assertEquals(edgeSet.size(), 1);
FlowEdge flowEdge = edgeSet.iterator().next();
Assert.assertEquals(flowEdge.getId(), Joiner.on("_").join(node1, node2, edgeName));
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
index 994240690..e0d022342 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
@@ -107,7 +107,7 @@ public class CallbacksDispatcher<L> implements Closeable {
public synchronized void addListener(L listener) {
Preconditions.checkNotNull(listener);
- _log.info("Adding listener:" + listener);
+ _log.debug("Adding listener:" + listener);
_listeners.add(listener);
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ExceptionCatchingPathAlterationListenerDecorator.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ExceptionCatchingPathAlterationListenerDecorator.java
index 4989275a6..00c8472d5 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ExceptionCatchingPathAlterationListenerDecorator.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ExceptionCatchingPathAlterationListenerDecorator.java
@@ -17,6 +17,9 @@
package org.apache.gobblin.util.filesystem;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+
import org.apache.hadoop.fs.Path;
import org.apache.gobblin.util.Decorator;
@@ -42,73 +45,83 @@ public class ExceptionCatchingPathAlterationListenerDecorator implements PathAlt
@Override
public void onStart(PathAlterationObserver observer) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onStart(observer);
- } catch (Throwable exc) {
- log.error("onStart failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onFileCreate(Path path) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onFileCreate(path);
- } catch (Throwable exc) {
- log.error("onFileCreate failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onFileChange(Path path) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onFileChange(path);
- } catch (Throwable exc) {
- log.error("onFileChange failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onStop(PathAlterationObserver observer) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onStop(observer);
- } catch (Throwable exc) {
- log.error("onStop failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onDirectoryCreate(Path directory) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onDirectoryCreate(directory);
- } catch (Throwable exc) {
- log.error("onDirectoryCreate failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onDirectoryChange(Path directory) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onDirectoryChange(directory);
- } catch (Throwable exc) {
- log.error("onDirectoryChange failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onDirectoryDelete(Path directory) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onDirectoryDelete(directory);
- } catch (Throwable exc) {
- log.error("onDirectoryDelete failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onFileDelete(Path path) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onFileDelete(path);
+ return null;
+ });
+ }
+
+ @Override
+ public void onCheckDetectedChange() {
+ logSwallowedThrowable(() -> {
+ this.underlying.onCheckDetectedChange();
+ return null;
+ });
+ }
+
+ protected void logSwallowedThrowable(Callable<Void> c) {
+ try {
+ c.call();
} catch (Throwable exc) {
- log.error("onFileDelete failure: ", exc);
+ String methodName = Arrays.stream(exc.getStackTrace()).findFirst().get().getMethodName();
+ log.error(methodName + " failed due to exception:", exc);
}
}
+
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListener.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListener.java
index fa4d94087..d271c9198 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListener.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListener.java
@@ -43,4 +43,10 @@ public interface PathAlterationListener {
void onDirectoryDelete(final Path directory);
void onFileDelete(final Path path);
+
+ /**
+ * Is invoked after any file or directory is modified, after processing the other change events
+ * Is only invoked once per poll, which is found in checkAndNotify() from {@link PathAlterationObserver}
+ */
+ void onCheckDetectedChange();
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListenerAdaptor.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListenerAdaptor.java
index cf5891a41..0a1ed6e6f 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListenerAdaptor.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListenerAdaptor.java
@@ -44,4 +44,7 @@ public class PathAlterationListenerAdaptor implements PathAlterationListener {
public void onFileDelete(final Path path) {
}
+
+ public void onCheckDetectedChange() {
+ }
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
index 6fcb44374..1d6dbd0e2 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
@@ -43,9 +43,9 @@ public class PathAlterationObserver {
private final PathFilter pathFilter;
private final Comparator<Path> comparator;
private final FileSystem fs;
-
private final Path[] EMPTY_PATH_ARRAY = new Path[0];
+ private boolean changeApplied = false;
/**
* Final processing.
*/
@@ -163,13 +163,15 @@ public class PathAlterationObserver {
/**
* Check whether the file and its children have been created, modified or deleted.
*/
- public void checkAndNotify()
+ public synchronized void checkAndNotify()
throws IOException {
+ // If any files or directories are modified this flag will be set to true
+ this.changeApplied = false;
+
/* fire onStart() */
for (final PathAlterationListener listener : listeners.values()) {
listener.onStart(this);
}
-
/* fire directory/file events */
final Path rootPath = rootEntry.getPath();
@@ -183,10 +185,18 @@ public class PathAlterationObserver {
// Didn't exist and still doesn't
}
+ if (this.changeApplied) {
+ for (final PathAlterationListener listener : listeners.values()) {
+ // Fire onCheckDetectedChange to notify when one check contains any number of changes
+ listener.onCheckDetectedChange();
+ }
+ }
+
/* fire onStop() */
for (final PathAlterationListener listener : listeners.values()) {
listener.onStop(this);
}
+
}
/**
@@ -196,7 +206,7 @@ public class PathAlterationObserver {
* @param previous The original list of paths
* @param currentPaths The current list of paths
*/
- private void checkAndNotify(final FileStatusEntry parent, final FileStatusEntry[] previous, final Path[] currentPaths)
+ private synchronized void checkAndNotify(final FileStatusEntry parent, final FileStatusEntry[] previous, final Path[] currentPaths)
throws IOException {
int c = 0;
@@ -265,8 +275,8 @@ public class PathAlterationObserver {
*
* @param entry The file entry
*/
- private void doCreate(final FileStatusEntry entry) {
-
+ protected synchronized void doCreate(final FileStatusEntry entry) {
+ this.changeApplied = true;
for (final PathAlterationListener listener : listeners.values()) {
if (entry.isDirectory()) {
listener.onDirectoryCreate(entry.getPath());
@@ -286,9 +296,10 @@ public class PathAlterationObserver {
* @param entry The previous file system entry
* @param path The current file
*/
- private void doMatch(final FileStatusEntry entry, final Path path)
+ private synchronized void doMatch(final FileStatusEntry entry, final Path path)
throws IOException {
if (entry.refresh(path)) {
+ this.changeApplied = true;
for (final PathAlterationListener listener : listeners.values()) {
if (entry.isDirectory()) {
listener.onDirectoryChange(path);
@@ -304,8 +315,9 @@ public class PathAlterationObserver {
*
* @param entry The file entry
*/
- private void doDelete(final FileStatusEntry entry) {
- for (final PathAlterationListener listener : listeners.values()) {
+ private synchronized void doDelete(final FileStatusEntry entry) {
+ this.changeApplied = true;
+ for (final PathAlterationListener listener : listeners.values()) {
if (entry.isDirectory()) {
listener.onDirectoryDelete(entry.getPath());
} else {