You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/20 23:01:15 UTC
[gobblin] branch master updated: [GOBBLIN-1724] Support a shared flowgraph layout in GaaS (#3583)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 635e81f8b [GOBBLIN-1724] Support a shared flowgraph layout in GaaS (#3583)
635e81f8b is described below
commit 635e81f8bcf10099cddf93d4e35e23c905fc840c
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Oct 20 16:01:09 2022 -0700
[GOBBLIN-1724] Support a shared flowgraph layout in GaaS (#3583)
* Add shared flowgraph format and helper
* Use static variables for constants
* Handle scenario where test dir can be at root
---
.../apache/gobblin/service/ServiceConfigKeys.java | 1 +
.../modules/flowgraph/BaseFlowGraphHelper.java | 58 ++++++----
.../modules/flowgraph/SharedFlowGraphHelper.java | 126 +++++++++++++++++++++
.../service/monitoring/FsFlowGraphMonitor.java | 22 +++-
.../service/monitoring/FsFlowGraphMonitorTest.java | 50 ++++++--
5 files changed, 223 insertions(+), 34 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index d9c251e69..1aa6e30dc 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -52,6 +52,7 @@ public class ServiceConfigKeys {
public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helixInstanceName";
public static final String GOBBLIN_SERVICE_FLOWSPEC = GOBBLIN_SERVICE_PREFIX + "flowSpec";
public static final String GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowGraph.class";
+ public static final String GOBBLIN_SERVICE_FLOWGRAPH_HELPER_KEY = GOBBLIN_SERVICE_PREFIX + "flowGraphHelper.class";
// Helix message sub types for FlowSpec
public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
index 4f0b82779..3e705a30b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
@@ -57,11 +57,21 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
* Provides the common set of functionalities needed by {@link FlowGraphMonitor} to read changes in files and
* apply them to a {@link FlowGraph}
* Assumes that the directory structure between flowgraphs configuration files are the same.
+ *
+ * Assumes that the flowgraph follows this format
+ * /gobblin-flowgraph
+ * /nodeA
+ * /nodeB
+ * edgeAB.properties
+ * A.properties
+ * /nodeB
+ * B.properties
+ *
*/
@Slf4j
public class BaseFlowGraphHelper {
- public static final int NODE_FILE_DEPTH = 3;
- public static final int EDGE_FILE_DEPTH = 4;
+ private static final int NODE_FILE_DEPTH = 3;
+ private static final int EDGE_FILE_DEPTH = 4;
private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
final String baseDirectory;
@@ -69,12 +79,12 @@ public class BaseFlowGraphHelper {
private final Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
private final Map<URI, TopologySpec> topologySpecMap;
- private MetricContext metricContext;
+ protected MetricContext metricContext;
final String flowGraphFolderName;
final PullFileLoader pullFileLoader;
final Set<String> javaPropsExtensions;
final Set<String> hoconFileExtensions;
- private final Optional<ContextAwareMeter> flowGraphUpdateFailedMeter;
+ protected final Optional<ContextAwareMeter> flowGraphUpdateFailedMeter;
public BaseFlowGraphHelper(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String flowGraphFolderName,
@@ -106,9 +116,9 @@ public class BaseFlowGraphHelper {
* to instantiate a {@link DataNode} from the node config file.
* @param path of node to add
*/
- protected void addDataNode(FlowGraph graph, String path) {
- if (checkFilePath(path, NODE_FILE_DEPTH)) {
- Path nodeFilePath = new Path(this.baseDirectory, path);
+ protected void addDataNode(FlowGraph graph, java.nio.file.Path path) {
+ if (!java.nio.file.Files.isDirectory(path) && checkFilePath(path.toString(), getNodeFileDepth())) {
+ Path nodeFilePath = new Path(this.baseDirectory, path.toString());
try {
Config config = loadNodeFileWithOverrides(nodeFilePath);
Class dataNodeClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.DATA_NODE_CLASS,
@@ -133,9 +143,9 @@ public class BaseFlowGraphHelper {
* provided by the {@link FlowGraph} to build a {@link FlowEdge} from the edge config file.
* @param path of edge to add
*/
- protected void addFlowEdge(FlowGraph graph, String path) {
- if (checkFilePath(path, EDGE_FILE_DEPTH)) {
- Path edgeFilePath = new Path(this.baseDirectory, path);
+ protected void addFlowEdge(FlowGraph graph, java.nio.file.Path path) {
+ if (!java.nio.file.Files.isDirectory(path) && checkFilePath(path.toString(), getEdgeFileDepth())) {
+ Path edgeFilePath = new Path(this.baseDirectory, path.toString());
try {
Config edgeConfig = loadEdgeFileWithOverrides(edgeFilePath);
List<SpecExecutor> specExecutors = getSpecExecutors(edgeConfig);
@@ -168,7 +178,7 @@ public class BaseFlowGraphHelper {
* @param file the relative path from the root of the flowgraph
* @return false if the file does not conform
*/
- private boolean checkFilePath(String file, int depth) {
+ protected boolean checkFilePath(String file, int depth) {
// The file is either a node file or an edge file and needs to be stored at either:
// flowGraphDir/nodeName/nodeName.properties (if it is a node file), or
// flowGraphDir/nodeName/nodeName/edgeName.properties (if it is an edge file)
@@ -197,7 +207,7 @@ public class BaseFlowGraphHelper {
for (int i = 0; i < depth - 1; i++) {
path = path.getParent();
}
- return path.getName().equals(flowGraphFolderName);
+ return path != null ? path.getName().equals(flowGraphFolderName) : false;
}
/**
@@ -285,20 +295,18 @@ public class BaseFlowGraphHelper {
FlowGraph newFlowGraph = new BaseFlowGraph();
java.nio.file.Path graphPath = new File(this.baseDirectory).toPath();
try {
- List<Path> edges = new ArrayList<>();
+ List<java.nio.file.Path> edges = new ArrayList<>();
// All nodes must be added first before edges, otherwise edges may have a missing source or destination.
// Need to convert files to Hadoop Paths to be compatible with FileAlterationListener
java.nio.file.Files.walk(graphPath).forEach(fileName -> {
- if (!java.nio.file.Files.isDirectory(fileName)) {
- if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), NODE_FILE_DEPTH)) {
- addDataNode(newFlowGraph, fileName.toString());
- } else if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), EDGE_FILE_DEPTH)) {
- edges.add(new Path(fileName.toString()));
- }
+ if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), getNodeFileDepth())) {
+ addDataNode(newFlowGraph, fileName);
+ } else if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), getEdgeFileDepth())) {
+ edges.add(fileName);
}
});
- for (Path edge : edges) {
- addFlowEdge(newFlowGraph, edge.toString());
+ for (java.nio.file.Path edge : edges) {
+ addFlowEdge(newFlowGraph, edge);
}
return newFlowGraph;
} catch (IOException e) {
@@ -321,4 +329,12 @@ public class BaseFlowGraphHelper {
public String getEdgeId(String source, String destination, String edgeName) {
return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(source, destination, edgeName);
}
+
+ protected int getNodeFileDepth() {
+ return NODE_FILE_DEPTH;
+ }
+
+ protected int getEdgeFileDepth() {
+ return EDGE_FILE_DEPTH;
+ }
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java
new file mode 100644
index 000000000..ce0c98b3d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * Supports a configuration of a flowgraph where it can support multiple sub-flowgraphs within its directory
+ * Node definitions are shared between each subgraph, but can be overwritten within the subgraph
+ * Edge definitions are only defined in the subgraphs
+ * e.g.
+ * /gobblin-flowgraph-absolute-dir
+ * /subgraphA
+ * /nodeA (NODE_FOLDER_DEPTH)
+ * /nodeB
+ * edgeAB.properties
+ * /subgraphB
+ * /nodeA
+ * /nodeB
+ * edgeAB.properties (EDGE_FILE_DEPTH)
+ * A.properties (NODE_FILE_DEPTH)
+ * /nodes
+ * A.properties
+ * B.properties
+ */
+@Slf4j
+public class SharedFlowGraphHelper extends BaseFlowGraphHelper {
+
+ protected String sharedNodeFolder;
+ private static String NODE_FILE_SUFFIX = ".properties";
+ private static String SHARED_NODE_FOLDER_NAME = "nodes";
+ private static int NODE_FOLDER_DEPTH = 2;
+
+ public SharedFlowGraphHelper(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
+ Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String flowGraphFolderName,
+ String javaPropsExtentions, String hoconFileExtensions, boolean instrumentationEnabled, Config config) {
+ super(flowTemplateCatalog, topologySpecMap, baseDirectory, flowGraphFolderName, javaPropsExtentions, hoconFileExtensions, instrumentationEnabled, config);
+ this.sharedNodeFolder = baseDirectory + File.separator + SHARED_NODE_FOLDER_NAME;
+ }
+
+ /**
+ * Looks into the sharedNodeFolder to use those configurations as fallbacks for the node to add
+ * Otherwise if the shared node does not exist, attempt to add the node in the same manner as {@link BaseFlowGraphHelper}
+ * @param graph
+ * @param path of node folder in the subgraph, so path is expected to be a directory
+ */
+ @Override
+ protected void addDataNode(FlowGraph graph, java.nio.file.Path path) {
+ try {
+ // Load node from shared folder first if it exists
+ Config sharedNodeConfig = ConfigFactory.empty();
+ String nodePropertyFile = path.getFileName().toString() + NODE_FILE_SUFFIX;
+ File sharedNodeFile = new File(this.sharedNodeFolder, nodePropertyFile);
+ if (sharedNodeFile.exists()) {
+ sharedNodeConfig = loadNodeFileWithOverrides(new Path(sharedNodeFile.getPath()));
+ }
+ File nodeFilePath = new File(path.toString(), nodePropertyFile);
+ Config nodeConfig = sharedNodeConfig;
+ if (nodeFilePath.exists()) {
+ nodeConfig = loadNodeFileWithOverrides(new Path(nodeFilePath.getPath())).withFallback(sharedNodeConfig);
+ }
+ if (nodeConfig.isEmpty()) {
+ throw new IOException(String.format("Cannot find expected property file %s in %s or %s", nodePropertyFile, sharedNodeFolder, path));
+ }
+ Class dataNodeClass = Class.forName(ConfigUtils.getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS,
+ FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+ DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
+ if (!graph.addDataNode(dataNode)) {
+ log.warn("Could not add DataNode {} to FlowGraph; skipping", dataNode.getId());
+ } else {
+ log.info("Added Datanode {} to FlowGraph", dataNode.getId());
+ }
+ } catch (Exception e) {
+ if (this.flowGraphUpdateFailedMeter.isPresent()) {
+ this.flowGraphUpdateFailedMeter.get().mark();
+ }
+ log.warn("Could not add DataNode {} defined in {} due to exception {}", path, e);
+ }
+ }
+
+ @Override
+ protected Config getNodeConfigWithOverrides(Config nodeConfig, Path nodeFilePath) {
+ String nodeId = FilenameUtils.removeExtension(nodeFilePath.getName().toString());
+ return nodeConfig.withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef(nodeId));
+ }
+
+ @Override
+ protected int getNodeFileDepth() {
+ return NODE_FOLDER_DEPTH;
+ }
+}
+
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
index 6183e7d66..bda777ae3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
@@ -18,11 +18,13 @@
package org.apache.gobblin.service.monitoring;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Optional;
@@ -35,11 +37,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
import org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.filesystem.PathAlterationObserver;
import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
@@ -74,13 +79,18 @@ public class FsFlowGraphMonitor extends AbstractIdleService implements FlowGraph
TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
this.flowGraphPath = new Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR));
this.observer = new PathAlterationObserver(flowGraphPath);
- this.flowGraphHelper = new BaseFlowGraphHelper(flowTemplateCatalog, topologySpecMap, flowGraphPath.toString(),
- configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS),
- configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS), instrumentationEnabled, config);
+ try {
+ String helperClassName = ConfigUtils.getString(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_HELPER_KEY,
+ BaseFlowGraphHelper.class.getCanonicalName());
+ this.flowGraphHelper = (BaseFlowGraphHelper) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(BaseFlowGraphHelper.class).resolve(helperClassName)), flowTemplateCatalog, topologySpecMap, flowGraphPath.toString(),
+ configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_BASE_DIR), configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS),
+ configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS), instrumentationEnabled, config);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
this.listener = new FSPathAlterationFlowGraphListener(flowTemplateCatalog, compiler, flowGraphPath.toString(), this.flowGraphHelper);
-
- this.compiler = compiler;
- this.initComplete = initComplete;
+ this.compiler = compiler;
+ this.initComplete = initComplete;
if (pollingInterval == ConfigurationKeys.DISABLED_JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL) {
this.pathAlterationDetector = null;
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
index 0c0799c23..1a4a36eff 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
@@ -23,6 +23,8 @@ import com.google.common.base.Optional;
import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
import java.io.File;
import java.io.IOException;
import java.net.URI;
@@ -68,6 +70,7 @@ public class FsFlowGraphMonitorTest {
private final File node2File = new File(node2Dir, NODE_2_FILE);
private final File edge1Dir = new File(node1Dir, "node2");
private final File edge1File = new File(edge1Dir, "edge1.properties");
+ private final File sharedNodeFolder = new File(TEST_DIR, "nodes");
private RefSpec masterRefSpec = new RefSpec("master");
private Optional<FSFlowTemplateCatalog> flowCatalog;
@@ -79,14 +82,14 @@ public class FsFlowGraphMonitorTest {
@BeforeClass
public void setUp() throws Exception {
cleanUpDir(TEST_DIR.toString());
-
+ TEST_DIR.mkdirs();
URI topologyCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
this.topologySpecMap = MultiHopFlowCompilerTest.buildTopologySpecMap(topologyCatalogUri);
this.config = ConfigBuilder.create()
.addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "."
- + ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR, this.flowGraphDir.getAbsolutePath())
+ + ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR, TEST_DIR.getAbsolutePath())
.addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.FLOWGRAPH_BASE_DIR, "gobblin-flowgraph")
.addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 1)
.build();
@@ -173,6 +176,7 @@ public class FsFlowGraphMonitorTest {
@Test (dependsOnMethods = "testUpdateNode")
public void testSetUpExistingGraph() throws Exception {
// Create a FlowGraph instance with defaults
+ this.flowGraphMonitor.shutDown();
this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
MultiHopFlowCompiler mhfc = new MultiHopFlowCompiler(config, this.flowGraph);
@@ -186,10 +190,43 @@ public class FsFlowGraphMonitorTest {
Assert.assertNotNull(this.flowGraph.get().getNode("node1"));
Assert.assertNotNull(this.flowGraph.get().getNode("node2"));
Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 1);
-
}
@Test (dependsOnMethods = "testSetUpExistingGraph")
+ public void testSharedFlowgraphHelper() throws Exception {
+ this.flowGraphMonitor.shutDown();
+ Config sharedFlowgraphConfig = ConfigFactory.empty()
+ .withValue(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_HELPER_KEY, ConfigValueFactory.fromAnyRef("org.apache.gobblin.service.modules.flowgraph.SharedFlowGraphHelper"))
+ .withFallback(this.config);
+
+
+ this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
+ MultiHopFlowCompiler mhfc = new MultiHopFlowCompiler(config, this.flowGraph);
+ // Set up node 3
+ File node3Folder = new File(this.flowGraphDir, "node3");
+ node3Folder.mkdirs();
+ File node3File = new File(this.sharedNodeFolder, "node3.properties");
+ String file3Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam3=value3\n";
+
+ // Have different default values for node 1
+ File node1File = new File(this.sharedNodeFolder, "node1.properties");
+ String file1Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam2=value10\n";
+
+ createNewFile(this.sharedNodeFolder, node3File, file3Contents);
+ createNewFile(this.sharedNodeFolder, node1File, file1Contents);
+
+ this.flowGraphMonitor = new FsFlowGraphMonitor(sharedFlowgraphConfig, this.flowCatalog, mhfc, this.topologySpecMap, new CountDownLatch(1), true);
+ this.flowGraphMonitor.startUp();
+ this.flowGraphMonitor.setActive(true);
+ // Let the monitor repopulate the flowgraph
+ Thread.sleep(3000);
+ Assert.assertNotNull(this.flowGraph.get().getNode("node3"));
+ DataNode node1 = this.flowGraph.get().getNode("node1");
+ Assert.assertTrue(node1.isActive());
+ Assert.assertEquals(node1.getRawConfig().getString("param2"), "value10");
+ }
+
+ @Test (dependsOnMethods = "testSharedFlowgraphHelper")
public void testRemoveEdge() throws Exception {
//Node1 has 1 edge before delete
Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges("node1");
@@ -219,7 +256,6 @@ public class FsFlowGraphMonitorTest {
//delete node files
FileUtils.deleteDirectory(node1FlowGraphFile);
FileUtils.deleteDirectory(node2FlowGraphFile);
-
// Let the monitor pick up the edges that were recently deleted
Thread.sleep(3000);
@@ -297,13 +333,13 @@ public class FsFlowGraphMonitorTest {
}
private void cleanUpDir(String dir) {
- File specStoreDir = new File(dir);
+ File dirToDelete = new File(dir);
// cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
for (int i = 0; i < 5; i++) {
try {
- if (specStoreDir.exists()) {
- FileUtils.deleteDirectory(specStoreDir);
+ if (dirToDelete.exists()) {
+ FileUtils.deleteDirectory(dirToDelete);
}
// if delete succeeded then break out of loop
break;