You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/07 17:01:10 UTC
incubator-gobblin git commit: [GOBBLIN-646] Refactor
MultiHopFlowCompiler to use SpecExecutor configs from TopologySpecMap.[]
Repository: incubator-gobblin
Updated Branches:
refs/heads/master b2dd7ddfc -> ccaa02c6e
[GOBBLIN-646] Refactor MultiHopFlowCompiler to use SpecExecutor configs from TopologySpecMap.[]
Closes #2516 from sv2000/specExecutorsRefactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ccaa02c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ccaa02c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ccaa02c6
Branch: refs/heads/master
Commit: ccaa02c6e132a5731706a61620f04abd89be75fe
Parents: b2dd7dd
Author: suvasude <su...@linkedin.biz>
Authored: Fri Dec 7 09:01:05 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Dec 7 09:01:05 2018 -0800
----------------------------------------------------------------------
.../modules/core/GitFlowGraphMonitor.java | 39 ++++++++++++--
.../modules/flow/MultiHopFlowCompiler.java | 2 +-
.../service/modules/flowgraph/BaseFlowEdge.java | 27 ++++------
.../modules/flowgraph/FlowEdgeFactory.java | 7 ++-
.../pathfinder/AbstractPathFinder.java | 9 ++--
.../modules/core/GitFlowGraphMonitorTest.java | 19 ++++---
.../modules/flow/MultiHopFlowCompilerTest.java | 54 ++++++++++++++++++--
.../flowgraph/BaseFlowEdgeFactoryTest.java | 21 +++++---
.../adls-1-to-adls-1-retention-1.properties | 6 +--
.../adls-1-to-adls-1-retention-2.properties | 6 +--
.../hdfs-1-to-hdfs-1-encrypt.properties | 6 +--
.../hdfs-1-to-hdfs-1-retention.properties | 6 +--
.../flowedges/hdfs-1-to-hdfs-3.properties | 6 +--
.../hdfs-2-hdfs-2-retention.properties | 6 +--
.../hdfs-2-to-hdfs-2-encrypt.properties | 6 +--
.../flowedges/hdfs-2-to-hdfs-4.properties | 6 +--
.../flowedges/hdfs-3-to-adls-1.properties | 7 +--
.../hdfs-3-to-hdfs-3-retention.properties | 6 +--
.../flowedges/hdfs-4-to-adls-1.properties | 6 +--
.../hdfs-4-to-hdfs-4-retention.properties | 6 +--
.../flowedges/local-to-hdfs-1.properties | 6 +--
.../flowedges/local-to-hdfs-2.properties | 6 +--
.../local-to-local-retention.properties | 6 +--
.../jobs/hdfs-encrypt-avro-to-json.job | 3 ++
.../hdfsToAdl/jobs/distcp-hdfs-to-adl.job | 5 +-
.../hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job | 3 ++
.../localToHdfs/jobs/distcp-local-to-hdfs.job | 3 ++
.../distcp-push-hdfs-to-adl.template | 11 +---
.../multihop/jobTemplates/distcp.template | 11 +---
.../topologyspec_catalog/azkaban01.properties | 2 +
.../topologyspec_catalog/azkaban02.properties | 2 +
.../topologyspec_catalog/azkaban03.properties | 2 +
.../topologyspec_catalog/azkaban04.properties | 2 +
.../topologyspec_catalog/local01.properties | 2 +
.../testExecutor1.properties | 3 ++
.../testExecutor2.properties | 3 ++
36 files changed, 174 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index ed16c39..54544a1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -18,8 +18,12 @@
package org.apache.gobblin.service.modules.core;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
@@ -36,6 +40,8 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
@@ -82,12 +88,14 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
private FSFlowCatalog flowCatalog;
private FlowGraph flowGraph;
+ private final Map<URI, TopologySpec> topologySpecMap;
private final Config emptyConfig = ConfigFactory.empty();
- public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph) {
+ public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap) {
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
this.flowCatalog = flowCatalog;
this.flowGraph = graph;
+ this.topologySpecMap = topologySpecMap;
}
/**
@@ -203,11 +211,12 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
if (checkFilePath(change.getNewPath(), EDGE_FILE_DEPTH)) {
Path edgeFilePath = new Path(this.repositoryDir, change.getNewPath());
try {
- Config config = loadEdgeFileWithOverrides(edgeFilePath);
- Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+ Config edgeConfig = loadEdgeFileWithOverrides(edgeFilePath);
+ List<SpecExecutor> specExecutors = getSpecExecutors(edgeConfig);
+ Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
- FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
- FlowEdge edge = flowEdgeFactory.createFlowEdge(config, flowCatalog);
+ FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, edgeConfig);
+ FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowCatalog, specExecutors);
if (!this.flowGraph.addFlowEdge(edge)) {
log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId());
} else {
@@ -304,12 +313,32 @@ public class GitFlowGraphMonitor extends GitMonitoringService {
String source = edgeFilePath.getParent().getParent().getName();
String destination = edgeFilePath.getParent().getName();
String edgeName = Files.getNameWithoutExtension(edgeFilePath.getName());
+
return edgeConfig.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ConfigValueFactory.fromAnyRef(source))
.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ConfigValueFactory.fromAnyRef(destination))
.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(getEdgeId(source, destination, edgeName)));
}
/**
+ * This method first retrieves the logical names of all the {@link org.apache.gobblin.runtime.api.SpecExecutor}s
+ * for this edge and returns the SpecExecutors from the {@link TopologySpec} map.
+ * @param edgeConfig containing the logical names of SpecExecutors for this edge.
+ * @return a {@link List<SpecExecutor>}s for this edge.
+ */
+ private List<SpecExecutor> getSpecExecutors(Config edgeConfig)
+ throws URISyntaxException {
+ //Get the logical names of SpecExecutors where the FlowEdge can be executed.
+ List<String> specExecutorNames = ConfigUtils.getStringList(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
+ //Load all the SpecExecutor configurations for this FlowEdge from the SpecExecutor Catalog.
+ List<SpecExecutor> specExecutors = new ArrayList<>();
+ for (String specExecutorName: specExecutorNames) {
+ URI specExecutorUri = new URI(specExecutorName);
+ specExecutors.add(this.topologySpecMap.get(specExecutorUri).getSpecExecutor());
+ }
+ return specExecutors;
+ }
+
+ /**
* Load the node file.
* @param filePath path of the node file relative to the repository root
* @return the configuration object
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 50da32a..c1b3e84 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -94,7 +94,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
}
this.flowGraph = new BaseFlowGraph();
- this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph);
+ this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph, this.topologySpecMap);
this.serviceManager = new ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor));
addShutdownHook();
//Start the git flow graph monitor
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index 56f6c1b..250fd57 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -17,23 +17,32 @@
package org.apache.gobblin.service.modules.flowgraph;
+import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import joptsimple.internal.Strings;
import lombok.Getter;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.util.PullFileLoader;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.util.ConfigUtils;
@@ -128,7 +137,7 @@ public class BaseFlowEdge implements FlowEdge {
* @return a {@link BaseFlowEdge}
*/
@Override
- public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog) throws FlowEdgeCreationException {
+ public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException {
try {
String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, "");
Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
@@ -138,30 +147,16 @@ public class BaseFlowEdge implements FlowEdge {
String edgeId = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeId), "A FlowEdge must have a non-null or empty Id");
- List<Config> specExecutorConfigList = new ArrayList<>();
- boolean flag;
- for (int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)); i++) {
- specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i));
- }
-
String flowTemplateDirUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "");
//Perform basic validation
Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points");
Preconditions
- .checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor");
+ .checkArgument(specExecutors.size() > 0, "A FlowEdge must have at least one SpecExecutor");
Preconditions
.checkArgument(!Strings.isNullOrEmpty(flowTemplateDirUri), "FlowTemplate URI must be not null or empty");
boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
- //Build SpecExecutor from config
- List<SpecExecutor> specExecutors = new ArrayList<>();
-
- for (Config specExecutorConfig : specExecutorConfigList) {
- Class executorClass = Class.forName(specExecutorConfig.getString(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY));
- SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig);
- specExecutors.add(executor);
- }
FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateDirUri));
return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive);
} catch (RuntimeException e) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
index 2977231..3744bf0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
@@ -17,20 +17,23 @@
package org.apache.gobblin.service.modules.flowgraph;
+import java.util.List;
+
import com.typesafe.config.Config;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
public interface FlowEdgeFactory {
/**
* Construct a {@link FlowEdge} from the edge properties
* @param edgeProps properties of the {@link FlowEdge}
- * @param catalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s
+ * @param flowCatalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s
* useful for creating a {@link FlowEdge}.
* @return an instance of {@link FlowEdge}
* @throws FlowEdgeCreationException
*/
- public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog catalog) throws FlowEdgeCreationException;
+ public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException;
public class FlowEdgeCreationException extends Exception {
private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index f58a6d8..918f4a6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -181,7 +181,7 @@ public abstract class AbstractPathFinder implements PathFinder {
boolean foundExecutor = false;
//Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
- Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
+ Config mergedConfig = getMergedConfig(flowEdge);
List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
@@ -285,21 +285,18 @@ public abstract class AbstractPathFinder implements PathFinder {
* <ul>
* <p> the user provided flow config </p>
* <p> edge specific properties/overrides </p>
- * <p> spec executor config/overrides </p>
* <p> source node config </p>
* <p> destination node config </p>
* </ul>
* Each {@link JobTemplate}'s config will eventually be resolved against this merged config.
* @param flowEdge An instance of {@link FlowEdge}.
- * @param specExecutor A {@link SpecExecutor}.
* @return the merged config derived as described above.
*/
- private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor)
+ private Config getMergedConfig(FlowEdge flowEdge)
throws ExecutionException, InterruptedException {
Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
- Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig())
- .withFallback(srcNodeConfig).withFallback(destNodeConfig);
+ Config mergedConfig = flowConfig.withFallback(flowEdge.getConfig()).withFallback(srcNodeConfig).withFallback(destNodeConfig);
return mergedConfig;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index bd7d3cf..ab8af70 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -50,12 +51,14 @@ import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
public class GitFlowGraphMonitorTest {
@@ -96,6 +99,9 @@ public class GitFlowGraphMonitorTest {
this.gitForPush.commit().setMessage("First commit").call();
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+ URI topologyCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
+ Map<URI, TopologySpec> topologySpecMap = MultiHopFlowCompilerTest.buildTopologySpecMap(topologyCatalogUri);
+
this.config = ConfigBuilder.create()
.addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
+ ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
@@ -116,7 +122,7 @@ public class GitFlowGraphMonitorTest {
//Create a FlowGraph instance with defaults
this.flowGraph = new BaseFlowGraph();
- this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph);
+ this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap);
this.gitFlowGraphMonitor.setActive(true);
}
@@ -325,14 +331,7 @@ public class GitFlowGraphMonitorTest {
+ FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=" + edgeName + "\n"
+ FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+ FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "=testExecutor1,testExecutor2\n"
+ "key1=" + value + "\n";
return fileContents;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 54792f9..6b51a77 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -25,8 +25,10 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
@@ -38,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.lib.Repository;
@@ -68,6 +71,7 @@ import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
@@ -83,6 +87,7 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -116,9 +121,11 @@ public class MultiHopFlowCompilerTest {
}
}
+ URI specExecutorCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
+ Map<URI, TopologySpec> topologySpecMap = buildTopologySpecMap(specExecutorCatalogUri);
+
//Create a FSFlowCatalog instance
URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
- // Create a FSFlowCatalog instance
Properties properties = new Properties();
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
Config config = ConfigFactory.parseProperties(properties);
@@ -127,7 +134,6 @@ public class MultiHopFlowCompilerTest {
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
-
//Add FlowEdges from the edge properties files
URI flowEdgesURI = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
fs = FileSystem.get(flowEdgesURI, new Configuration());
@@ -139,13 +145,55 @@ public class MultiHopFlowCompilerTest {
Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
- FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog);
+ List<String> specExecutorNames = ConfigUtils.getStringList(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
+ List<SpecExecutor> specExecutors = new ArrayList<>();
+ for (String specExecutorName: specExecutorNames) {
+ specExecutors.add(topologySpecMap.get(new URI(specExecutorName)).getSpecExecutor());
+ }
+ FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog, specExecutors);
this.flowGraph.addFlowEdge(edge);
}
}
this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph);
}
+ /**
+ * A helper method to return a {@link TopologySpec} map, given a {@link org.apache.gobblin.runtime.spec_catalog.TopologyCatalog}.
+ * @param topologyCatalogUri pointing to the location of the {@link org.apache.gobblin.runtime.spec_catalog.TopologyCatalog}
+ * @return a {@link TopologySpec} map.
+ */
+ public static Map<URI, TopologySpec> buildTopologySpecMap(URI topologyCatalogUri)
+ throws IOException, URISyntaxException, ReflectiveOperationException {
+ FileSystem fs = FileSystem.get(topologyCatalogUri, new Configuration());
+ PathFilter extensionFilter = file -> {
+ for (String extension : Lists.newArrayList(".properties")) {
+ if (file.getName().endsWith(extension)) {
+ return true;
+ }
+ }
+ return false;
+ };
+
+ Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+ for (FileStatus fileStatus : fs.listStatus(new Path(topologyCatalogUri.getPath()), extensionFilter)) {
+ URI topologySpecUri = new URI(Files.getNameWithoutExtension(fileStatus.getPath().getName()));
+ Config topologyConfig = ConfigFactory.parseFile(new File(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
+ Class specExecutorClass = Class.forName(topologyConfig.getString(ServiceConfigKeys.SPEC_EXECUTOR_KEY));
+ SpecExecutor specExecutor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(specExecutorClass, topologyConfig);
+
+ TopologySpec.Builder topologySpecBuilder = TopologySpec
+ .builder(topologySpecUri)
+ .withConfig(topologyConfig)
+ .withDescription("")
+ .withVersion("1")
+ .withSpecExecutor(specExecutor);
+
+ TopologySpec topologySpec = topologySpecBuilder.build();
+ topologySpecMap.put(topologySpecUri, topologySpec);
+ }
+ return topologySpecMap;
+ }
+
private FlowSpec createFlowSpec(String flowConfigResource, String source, String destination, boolean applyRetention, boolean applyRetentionOnInput)
throws IOException, URISyntaxException {
//Create a flow spec
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
index 2694f5c..085d0a7 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
@@ -18,14 +18,19 @@
package org.apache.gobblin.service.modules.flowgraph;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.util.ConfigUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.ServiceConfigKeys;
@@ -43,12 +48,14 @@ public class BaseFlowEdgeFactoryTest {
properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1");
properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "node1:node2:edge1");
properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "FS:///flowEdgeTemplate");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2");
+
+ List<SpecExecutor> specExecutorList = new ArrayList<>();
+ Config config1 = ConfigFactory.empty().withValue("specStore.fs.dir", ConfigValueFactory.fromAnyRef("/tmp1")).
+ withValue("specExecInstance.capabilities", ConfigValueFactory.fromAnyRef("s1:d1"));
+ specExecutorList.add(new InMemorySpecExecutor(config1));
+ Config config2 = ConfigFactory.empty().withValue("specStore.fs.dir", ConfigValueFactory.fromAnyRef("/tmp2")).
+ withValue("specExecInstance.capabilities", ConfigValueFactory.fromAnyRef("s2:d2"));
+ specExecutorList.add(new InMemorySpecExecutor(config2));
FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory();
@@ -61,7 +68,7 @@ public class BaseFlowEdgeFactoryTest {
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
Config edgeProps = ConfigUtils.propertiesToConfig(properties);
- FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog);
+ FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog, specExecutorList);
Assert.assertEquals(flowEdge.getSrc(), "node1");
Assert.assertEquals(flowEdge.getDest(), "node2");
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1");
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
index 52079d1..b01722c 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
@@ -2,8 +2,4 @@ flow.edge.source=ADLS-1
flow.edge.destination=ADLS-1
flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban03
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
index 7b1a160..def8cb9 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
@@ -2,8 +2,4 @@ flow.edge.source=ADLS-1
flow.edge.destination=ADLS-1
flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban04
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
index 44d3c44..110c665 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-1
flow.edge.destination=HDFS-1
flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+flow.edge.specExecutors=azkaban01
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
index 926c51e..8e5b5ef 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-1
flow.edge.destination=HDFS-1
flow.edge.id=HDFS-1:HDFS-1:hdfsRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban01
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
index 897f003..5abd015 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
@@ -2,9 +2,5 @@ flow.edge.source=HDFS-1
flow.edge.destination=HDFS-3
flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+flow.edge.specExecutors=azkaban01
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
index 26454e7..8b54a22 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-2
flow.edge.destination=HDFS-2
flow.edge.id=HDFS-2:HDFS-2:hdfsRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban02
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
index db0ea48..24339ec 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-2
flow.edge.destination=HDFS-2
flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+flow.edge.specExecutors=azkaban02
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
index 44f0408..268a414 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-2
flow.edge.destination=HDFS-4
flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+flow.edge.specExecutors=azkaban02
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
index 68ca0ca..3471277 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
@@ -2,12 +2,7 @@ flow.edge.source=HDFS-3
flow.edge.destination=ADLS-1
flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
-
+flow.edge.specExecutors=azkaban03
# Proxy config
flow.edge.proxy.host=adl-proxy.linkedin.com
flow.edge.proxy.port=1234
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
index f390546..f1d1adf 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-3
flow.edge.destination=HDFS-3
flow.edge.id=HDFS-3:HDFS-3:hdfsRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban03
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
index 04f0885..9a08893 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
@@ -2,11 +2,7 @@ flow.edge.source=HDFS-4
flow.edge.destination=ADLS-1
flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+flow.edge.specExecutors=azkaban04
# Proxy config
flow.edge.proxy.host=adl-proxy.linkedin.com
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
index 6afb3d8..0a005c8 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
@@ -2,8 +2,4 @@ flow.edge.source=HDFS-4
flow.edge.destination=HDFS-4
flow.edge.id=HDFS-4:HDFS-4:hdfsRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
-flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file
+flow.edge.specExecutors=azkaban04
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
index 268b67f..8a3809e 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
@@ -2,8 +2,4 @@ flow.edge.source=LocalFS-1
flow.edge.destination=HDFS-1
flow.edge.id=LocalFS-1:HDFS-1:localToHdfs
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=fs:///
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
-flow.edge.specExecutors.0.specExecInstance.job.type=java
+flow.edge.specExecutors=local01
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
index bc67810..626a12f 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
@@ -2,8 +2,4 @@ flow.edge.source=LocalFS-1
flow.edge.destination=HDFS-2
flow.edge.id=LocalFS-1:HDFS-2:localToHdfs
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=fs:///
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
-flow.edge.specExecutors.0.specExecInstance.job.type=java
\ No newline at end of file
+flow.edge.specExecutors=local01
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
index 76c77fd..c1f17eb 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
@@ -2,8 +2,4 @@ flow.edge.source=LocalFS-1
flow.edge.destination=LocalFS-1
flow.edge.id=LocalFS-1:LocalFS-1:localRetention
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
-flow.edge.specExecutors.0.specExecInstance.uri=fs:///
-flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
-flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
-flow.edge.specExecutors.0.specExecInstance.job.type=java
\ No newline at end of file
+flow.edge.specExecutors=local01
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
index e0c8fa4..05fa8cd 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
@@ -1 +1,4 @@
gobblin.template.uri="FS:///multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template"
+job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+launcher.type=MAPREDUCE
+type=hadoopJava
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
index 429d4c0..779044b 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
@@ -1 +1,4 @@
-gobblin.template.uri="FS:///multihop/jobTemplates/distcp-push-hdfs-to-adl.template"
\ No newline at end of file
+gobblin.template.uri="FS:///multihop/jobTemplates/distcp-push-hdfs-to-adl.template"
+job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+launcher.type=MAPREDUCE
+type=hadoopJava
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
index 2d1672c..13c067a 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
@@ -1 +1,4 @@
gobblin.template.uri="FS:///multihop/jobTemplates/distcp.template"
+job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+launcher.type=MAPREDUCE
+type=hadoopJava
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
index 2d1672c..823b0a4 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
@@ -1 +1,4 @@
gobblin.template.uri="FS:///multihop/jobTemplates/distcp.template"
+job.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+launcher.type=LOCAL
+type=java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
index 26b2ed1..0b664dd 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
@@ -52,14 +52,5 @@ source.class="org.apache.gobblin.data.management.copy.CopySource"
writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder"
converter.classes="org.apache.gobblin.converter.IdentityConverter"
-# =======================================
-# Job Parameters to be resolved using SpecExecutor properties
-# =======================================
-type=${specExecInstance.job.type}
-
job.jars="lib/*"
-job.lock.enabled=false
-job.class=${specExecInstance.job.launcher.class}
-
-# Gobblin Hadoop Parameters
-launcher.type=${specExecInstance.job.launcher.type}
\ No newline at end of file
+job.lock.enabled=false
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
index d9fcf6d..1a29262 100644
--- a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
@@ -43,14 +43,5 @@ source.class="org.apache.gobblin.data.management.copy.CopySource"
writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder"
converter.classes="org.apache.gobblin.converter.IdentityConverter"
-# =======================================
-# Job Parameters to be resolved using SpecExecutor properties
-# =======================================
-type=${specExecInstance.job.type}
-
job.jars="lib/*"
-job.lock.enabled=false
-job.class=${specExecInstance.job.launcher.class}
-
-# Gobblin Hadoop Parameters
-launcher.type=${specExecInstance.job.launcher.type}
\ No newline at end of file
+job.lock.enabled=false
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties
new file mode 100644
index 0000000..4bbb1d0
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties
@@ -0,0 +1,2 @@
+specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+specExecInstance.uri=https://azkaban01.gobblin.net:8443
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties
new file mode 100644
index 0000000..1a8856b
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties
@@ -0,0 +1,2 @@
+specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+specExecInstance.uri=https://azkaban02.gobblin.net:8443
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties
new file mode 100644
index 0000000..6261c8b
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties
@@ -0,0 +1,2 @@
+specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+specExecInstance.uri=https://azkaban03.gobblin.net:8443
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties
new file mode 100644
index 0000000..12125fd
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties
@@ -0,0 +1,2 @@
+specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+specExecInstance.uri=https://azkaban04.gobblin.net:8443
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties b/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties
new file mode 100644
index 0000000..db369de
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties
@@ -0,0 +1,2 @@
+specExecutorInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+specExecInstance.uri=fs:///
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties
new file mode 100644
index 0000000..7b67f9b
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties
@@ -0,0 +1,3 @@
+specExecutorInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+specStore.fs.dir=/tmp1
+specExecInstance.capabilities=s1:d1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties
new file mode 100644
index 0000000..449a076
--- /dev/null
+++ b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties
@@ -0,0 +1,3 @@
+specExecutorInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+specStore.fs.dir=/tmp2
+specExecInstance.capabilities=s2:d2
\ No newline at end of file