You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/30 16:57:38 UTC
[2/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow
Compiler for Gobblin-as-a-Service (GaaS).
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
new file mode 100644
index 0000000..5d0500c
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigSyntax;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Slf4j
+public class FlowGraphPathFinderTest {
+ private FlowGraph flowGraph;
+ private FlowGraphPathFinder pathFinder;
+
+ @BeforeClass
+ public void setUp()
+ throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException {
+ //Create a FlowGraph
+ this.flowGraph = new BaseFlowGraph();
+
+ //Add DataNodes to the graph from the node properties files
+ URI dataNodesUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI();
+ FileSystem fs = FileSystem.get(dataNodesUri, new Configuration());
+ Path dataNodesPath = new Path(dataNodesUri);
+ ConfigParseOptions options = ConfigParseOptions.defaults()
+ .setSyntax(ConfigSyntax.PROPERTIES)
+ .setAllowMissing(false);
+
+ for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) {
+ try (InputStream is = fs.open(fileStatus.getPath())) {
+ Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+ Class dataNodeClass = Class.forName(ConfigUtils
+ .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+ DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
+ this.flowGraph.addDataNode(dataNode);
+ }
+ }
+
+ //Create a FSFlowCatalog instance
+ URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ // Create a FSFlowCatalog instance
+ Properties properties = new Properties();
+ properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(properties);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+
+ //Add FlowEdges from the edge properties files
+ URI flowEdgesURI = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
+ fs = FileSystem.get(flowEdgesURI, new Configuration());
+ Path flowEdgesPath = new Path(flowEdgesURI);
+ for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) {
+ log.warn(fileStatus.getPath().toString());
+ try (InputStream is = fs.open(fileStatus.getPath())) {
+ Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+ Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+ FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
+ FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
+ FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog);
+ this.flowGraph.addFlowEdge(edge);
+ }
+ }
+
+ //Create a flow spec
+ Properties flowProperties = new Properties();
+ flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
+ flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup");
+ flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName");
+ flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "LocalFS-1");
+ flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "ADLS-1");
+ Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties);
+
+ //Get the input/output dataset config from a file
+ URI flowConfigUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flow/flow.conf").toURI();
+ Path flowConfigPath = new Path(flowConfigUri);
+ FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration());
+ try (InputStream is = fs1.open(flowConfigPath)) {
+ Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset()));
+ flowConfig = flowConfig.withFallback(datasetConfig).resolve();
+ }
+
+ FlowSpec.Builder flowSpecBuilder = null;
+ flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri())
+ .withConfig(flowConfig)
+ .withDescription("dummy description")
+ .withVersion(FlowSpec.Builder.DEFAULT_VERSION);
+
+ FlowSpec spec = flowSpecBuilder.build();
+ this.pathFinder = new FlowGraphPathFinder(this.flowGraph, spec);
+ }
+
+ @Test
+ public void testFindPath()
+ throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+ SpecNotFoundException, IOException {
+ Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
+ Assert.assertEquals(jobDag.getNodes().size(), 4);
+ Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+ Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+ //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1"
+ Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+ JobExecutionPlan jobSpecWithExecutor = startNode.getValue();
+ JobSpec jobSpec = jobSpecWithExecutor.getJobSpec();
+
+ //Ensure the resolved job config for the first hop has the correct substitutions.
+ Config jobConfig = jobSpec.getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ String from = jobConfig.getString("from");
+ String to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+ String sourceFsUri = jobConfig.getString("fs.uri");
+ Assert.assertEquals(sourceFsUri, "file:///");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+ Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+ String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+ Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+ Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ Assert.assertEquals(jobConfig.getString("type"), "java");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+ //Ensure the spec executor has the correct configurations
+ SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+ //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt"
+ Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+ jobSpecWithExecutor = secondHopNode.getValue();
+ jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3"
+ Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+ jobSpecWithExecutor = thirdHopNode.getValue();
+ jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Get the 4th hop - "Distcp from HDFS3 to ADLS-1"
+ Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+ jobSpecWithExecutor = fourthHopNode.getValue();
+ jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+ Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+ Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Ensure the fourth hop is the last
+ Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+ }
+
+ @Test (dependsOnMethods = "testFindPath")
+ public void testFindPathAfterFirstEdgeDeletion()
+ throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+ SpecNotFoundException, IOException {
+ //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
+ this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
+
+ Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
+ Assert.assertEquals(jobDag.getNodes().size(), 4);
+ Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+ Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+ //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2"
+ Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+ JobExecutionPlan jobExecutionPlan = startNode.getValue();
+ JobSpec jobSpec = jobExecutionPlan.getJobSpec();
+
+ //Ensure the resolved job config for the first hop has the correct substitutions.
+ Config jobConfig = jobSpec.getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ String from = jobConfig.getString("from");
+ String to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+ String sourceFsUri = jobConfig.getString("fs.uri");
+ Assert.assertEquals(sourceFsUri, "file:///");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+ Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+ String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+ Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+ Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ Assert.assertEquals(jobConfig.getString("type"), "java");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+ //Ensure the spec executor has the correct configurations
+ SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+ //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt"
+ Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+ jobExecutionPlan = secondHopNode.getValue();
+ jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4"
+ Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+ jobExecutionPlan = thirdHopNode.getValue();
+ jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Get the 4th hop - "Distcp from HDFS4 to ADLS-1"
+ Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+ jobExecutionPlan = fourthHopNode.getValue();
+ jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+ Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+ Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Ensure the fourth hop is the last
+ Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+ }
+
+ @Test (dependsOnMethods = "testFindPathAfterFirstEdgeDeletion")
+ public void testFindPathAfterSecondEdgeDeletion()
+ throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+ SpecNotFoundException, IOException {
+ //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
+ this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
+
+ //Ensure no path to destination.
+ Assert.assertNull(pathFinder.findPath());
+ }
+
+ @AfterClass
+ public void tearDown() {
+ }
+
+ public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor {
+ // Executor Instance
+ protected final Config config;
+
+ private SpecProducer<Spec> azkabanSpecProducer;
+
+ public TestAzkabanSpecExecutor(Config config) {
+ super(config);
+ this.config = config;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ //Do nothing
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ //Do nothing
+ }
+
+ @Override
+ public Future<String> getDescription() {
+ return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
+ }
+
+ @Override
+ public Future<? extends SpecProducer> getProducer() {
+ return new CompletedFuture<>(this.azkabanSpecProducer, null);
+ }
+
+ @Override
+ public Future<Config> getConfig() {
+ return new CompletedFuture<>(config, null);
+ }
+
+ @Override
+ public Future<String> getHealth() {
+ return new CompletedFuture<>("Healthy", null);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
new file mode 100644
index 0000000..2694f5c
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.net.URI;
+import java.util.Properties;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class BaseFlowEdgeFactoryTest {
+ @Test
+ public void testCreateFlowEdge() throws Exception {
+ Properties properties = new Properties();
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"node1");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "node2");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "node1:node2:edge1");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "FS:///flowEdgeTemplate");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2");
+
+ FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory();
+
+ Properties props = new Properties();
+ URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ props.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(props);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+ Config edgeProps = ConfigUtils.propertiesToConfig(properties);
+ FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog);
+ Assert.assertEquals(flowEdge.getSrc(), "node1");
+ Assert.assertEquals(flowEdge.getDest(), "node2");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),"s1:d1");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),"/tmp2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),"s2:d2");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(),"InMemorySpecExecutor");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(),"InMemorySpecExecutor");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
index 04f2270..be7b597 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.service.modules.flowgraph;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
@@ -33,8 +32,6 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.template.FlowTemplate;
import org.apache.gobblin.service.modules.template.StaticFlowTemplate;
import org.apache.gobblin.util.ConfigUtils;
@@ -57,9 +54,7 @@ public class BaseFlowGraphTest {
BaseFlowGraph graph;
@BeforeClass
- public void setUp()
- throws URISyntaxException, ReflectiveOperationException, JobTemplate.TemplateException, SpecNotFoundException,
- IOException, DataNode.DataNodeCreationException {
+ public void setUp() throws URISyntaxException, DataNode.DataNodeCreationException {
Properties properties = new Properties();
properties.put("key1", "val1");
Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
@@ -81,9 +76,9 @@ public class BaseFlowGraphTest {
//Create a clone of node3
node3c = new BaseDataNode(node3Config);
- FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null, null);
- FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null, null);
- FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null, null);
+ FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null);
+ FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null);
+ FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null);
//Create edge instances
edgeId1 = "node1:node2:edge1";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
new file mode 100644
index 0000000..2542f5e
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class JobExecutionPlanDagFactoryTest {
+ private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
+ private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME;
+ private SpecExecutor specExecutor;
+ private List<JobTemplate> jobTemplates;
+
+ @BeforeClass
+ public void setUp() throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException {
+ // Create a FSFlowCatalog instance
+ URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ Properties properties = new Properties();
+ properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(properties);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+ FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
+ this.jobTemplates = flowTemplate.getJobTemplates();
+
+ //Create a spec executor instance
+ properties = new Properties();
+ properties.put("specStore.fs.dir", "/tmp/testSpecStoreDir");
+ properties.put("specExecInstance.capabilities", "source:destination");
+ Config specExecutorConfig = ConfigUtils.propertiesToConfig(properties);
+ this.specExecutor = new InMemorySpecExecutor(specExecutorConfig);
+ }
+
+ @Test
+ public void testCreateDag() throws Exception {
+ //Create a list of JobExecutionPlans
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+ for (JobTemplate jobTemplate: this.jobTemplates) {
+ String jobSpecUri = Files.getNameWithoutExtension(new Path(jobTemplate.getUri()).getName());
+ jobExecutionPlans.add(new JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(jobTemplate.getRawTemplateConfig()).
+ withVersion("1").withTemplate(jobTemplate.getUri()).build(), specExecutor));
+ }
+
+ //Create a DAG from job execution plans.
+ Dag<JobExecutionPlan> dag = new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+
+ //Test DAG properties
+ Assert.assertEquals(dag.getStartNodes().size(), 1);
+ Assert.assertEquals(dag.getEndNodes().size(), 1);
+ Assert.assertEquals(dag.getNodes().size(), 4);
+ String startNodeName = new Path(dag.getStartNodes().get(0).getValue().getJobSpec().getUri()).getName();
+ Assert.assertEquals(startNodeName, "job1");
+ String templateUri = new Path(dag.getStartNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName();
+ Assert.assertEquals(templateUri, "job1.job");
+ String endNodeName = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getUri()).getName();
+ Assert.assertEquals(endNodeName, "job4");
+ templateUri = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName();
+ Assert.assertEquals(templateUri, "job4.job");
+
+ Dag.DagNode<JobExecutionPlan> startNode = dag.getStartNodes().get(0);
+ List<Dag.DagNode<JobExecutionPlan>> nextNodes = dag.getChildren(startNode);
+ Set<String> nodeSet = new HashSet<>();
+ for (Dag.DagNode<JobExecutionPlan> node: nextNodes) {
+ nodeSet.add(new Path(node.getValue().getJobSpec().getUri()).getName());
+ Dag.DagNode<JobExecutionPlan> nextNode = dag.getChildren(node).get(0);
+ Assert.assertEquals(new Path(nextNode.getValue().getJobSpec().getUri()).getName(), "job4");
+ }
+ Assert.assertTrue(nodeSet.contains("job2"));
+ Assert.assertTrue(nodeSet.contains("job3"));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
deleted file mode 100644
index 58d879e..0000000
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.template;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-
-public class JobTemplateDagFactoryTest {
- private static final String TEST_TEMPLATE_NAME = "test-template";
- private static final String TEST_FLOW_CONF_FILE_NAME="flow.conf";
- private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME + "/" + TEST_FLOW_CONF_FILE_NAME;
- FSFlowCatalog catalog;
-
- @BeforeClass
- public void setUp()
- throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException {
- URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-
- // Create a FSFlowCatalog instance
- Properties properties = new Properties();
- properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
- Config config = ConfigFactory.parseProperties(properties);
- Config templateCatalogCfg = config
- .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- this.catalog = new FSFlowCatalog(templateCatalogCfg);
- }
-
- @Test
- public void testCreateDagFromJobTemplates() throws Exception {
- FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
- List<JobTemplate> jobTemplates = flowTemplate.getJobTemplates();
-
- //Create a DAG from job templates.
- Dag<JobTemplate> jobTemplateDag = JobTemplateDagFactory.createDagFromJobTemplates(jobTemplates);
-
- //Test DAG properties
- Assert.assertEquals(jobTemplateDag.getStartNodes().size(), 1);
- Assert.assertEquals(jobTemplateDag.getEndNodes().size(), 1);
- Assert.assertEquals(jobTemplateDag.getNodes().size(), 4);
- String startNodeName = new Path(jobTemplateDag.getStartNodes().get(0).getValue().getUri()).getName();
- Assert.assertEquals(startNodeName, "job1.conf");
- String endNodeName = new Path(jobTemplateDag.getEndNodes().get(0).getValue().getUri()).getName();
- Assert.assertEquals(endNodeName, "job4.conf");
-
- Dag.DagNode<JobTemplate> startNode = jobTemplateDag.getStartNodes().get(0);
- List<Dag.DagNode<JobTemplate>> nextNodes = jobTemplateDag.getChildren(startNode);
- Set<String> nodeSet = new HashSet<>();
- for(Dag.DagNode<JobTemplate> node: nextNodes) {
- nodeSet.add(new Path(node.getValue().getUri()).getName());
- Dag.DagNode<JobTemplate> nextNode = jobTemplateDag.getChildren(node).get(0);
- Assert.assertEquals(new Path(nextNode.getValue().getUri()).getName(), "job4.conf");
- }
- Assert.assertTrue(nodeSet.contains("job2.conf"));
- Assert.assertTrue(nodeSet.contains("job3.conf"));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
index b20606f..3c8ebd3 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
@@ -23,19 +23,20 @@ import java.util.Properties;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
-import org.apache.gobblin.service.modules.dataset.HdfsDatasetDescriptor;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.template.FlowTemplate;
import org.testng.collections.Lists;
@@ -43,9 +44,8 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FSFlowCatalogTest {
- private static final String TEST_TEMPLATE_NAME = "test-template";
- private static final String TEST_FLOW_CONF_FILE_NAME="flow.conf";
- private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME + "/" + TEST_FLOW_CONF_FILE_NAME;
+ private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
+ private static final String TEST_TEMPLATE_DIR_URI = "FS:///" + TEST_TEMPLATE_NAME;
@Test
public void testGetFlowTemplate() throws Exception {
@@ -58,50 +58,45 @@ public class FSFlowCatalogTest {
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
- FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
+ FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_DIR_URI));
//Basic sanity check for the FlowTemplate
- Dag<JobTemplate> jobTemplateDag = flowTemplate.getDag();
- List<Dag.DagNode<JobTemplate>> dagNodes = jobTemplateDag.getNodes();
- Assert.assertTrue(dagNodes.size() == 4);
- Assert.assertEquals(jobTemplateDag.getStartNodes().size(), 1);
- Assert.assertEquals(jobTemplateDag.getEndNodes().size(), 1);
- Dag.DagNode<JobTemplate> dagNode = jobTemplateDag.getStartNodes().get(0);
- URI startNodeUri = this.getClass().getClassLoader().getResource("template_catalog/test-template/jobs/job1.conf").toURI();
- URI endNodeUri = this.getClass().getClassLoader().getResource("template_catalog/test-template/jobs/job4.conf").toURI();
- Assert.assertEquals(jobTemplateDag.getStartNodes().get(0).getValue().getUri(), startNodeUri);
- Assert.assertEquals(jobTemplateDag.getEndNodes().get(0).getValue().getUri(), endNodeUri);
List<JobTemplate> jobTemplates = flowTemplate.getJobTemplates();
Assert.assertEquals(jobTemplates.size(), 4);
- for(int i=0; i<4; i++) {
+ for (int i = 0; i < 4; i++) {
String uri = new Path(jobTemplates.get(i).getUri()).getName().split("\\.")[0];
String templateId = uri.substring(uri.length() - 1);
- for(int j=0; j<2; j++) {
+ for (int j = 0; j < 2; j++) {
Config jobTemplateConfig = jobTemplates.get(i).getRawTemplateConfig();
- String suffix = templateId + Integer.toString(j+1);
+ String suffix = templateId + Integer.toString(j + 1);
Assert.assertEquals(jobTemplateConfig.getString("key" + suffix), "val" + suffix);
}
}
- List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = flowTemplate.getInputOutputDatasetDescriptors();
+ Config flowConfig = ConfigFactory.empty().withValue("team.name", ConfigValueFactory.fromAnyRef("test-team"))
+ .withValue("dataset.name", ConfigValueFactory.fromAnyRef("test-dataset"));
+
+ List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = flowTemplate.getResolvingDatasetDescriptors(flowConfig);
Assert.assertTrue(inputOutputDescriptors.size() == 2);
List<String> dirs = Lists.newArrayList("inbound", "outbound");
- for(int i=0; i<2; i++) {
- for (int j=0; j<2; j++) {
- HdfsDatasetDescriptor datasetDescriptor;
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 2; j++) {
+ FSDatasetDescriptor datasetDescriptor;
if (j == 0) {
- datasetDescriptor = (HdfsDatasetDescriptor) inputOutputDescriptors.get(i).getLeft();
+ datasetDescriptor = (FSDatasetDescriptor) inputOutputDescriptors.get(i).getLeft();
} else {
- datasetDescriptor = (HdfsDatasetDescriptor) inputOutputDescriptors.get(i).getRight();
+ datasetDescriptor = (FSDatasetDescriptor) inputOutputDescriptors.get(i).getRight();
}
Assert.assertEquals(datasetDescriptor.getPlatform(), "hdfs");
- Assert.assertEquals(datasetDescriptor.getFormat(), "avro");
- Assert.assertEquals(datasetDescriptor.getPath(), "/data/" + dirs.get(i) + "/<TEAM_NAME>/<DATASET_NAME>");
+ Assert.assertEquals(datasetDescriptor.getFormatConfig().getFormat(), "avro");
+ Assert.assertEquals(datasetDescriptor.getPath(), "/data/" + dirs.get(i) + "/test-team/test-dataset");
}
}
Config flowTemplateConfig = flowTemplate.getRawTemplateConfig();
- Assert.assertEquals(flowTemplateConfig.getString("gobblin.flow.dataset.descriptor.input.0.class"), "org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor");
- Assert.assertEquals(flowTemplateConfig.getString("gobblin.flow.dataset.descriptor.output.0.class"), "org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor");
+ Assert.assertEquals(flowTemplateConfig.getString(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX + ".0."
+ + DatasetDescriptorConfigKeys.CLASS_KEY), FSDatasetDescriptor.class.getCanonicalName());
+ Assert.assertEquals(flowTemplateConfig.getString(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX
+ + ".0." + DatasetDescriptorConfigKeys.CLASS_KEY), FSDatasetDescriptor.class.getCanonicalName());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flow/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flow/flow.conf b/gobblin-service/src/test/resources/flow/flow.conf
new file mode 100644
index 0000000..f818df6
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow.conf
@@ -0,0 +1,24 @@
+team.name=testTeam
+dataset.name=testDataset
+user.to.proxy=testUser
+adls.user.to.proxy=adlsTestUser
+adls.oauth2.client.id=1234
+adls.ouath2.credential=credential
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.input.dataset.descriptor.format=avro
+gobblin.flow.input.dataset.descriptor.codec=NONE
+gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE
+
+#Output dataset - compressed and encrypted
+gobblin.flow.output.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.output.dataset.descriptor.platform=adls
+gobblin.flow.output.dataset.descriptor.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.output.dataset.descriptor.format=json
+gobblin.flow.output.dataset.descriptor.codec=gzip
+gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
new file mode 100644
index 0000000..a219e4f
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=ADLS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.AdlsDataNode
+data.node.fs.uri=adl://azuredatalakestore.net/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
new file mode 100644
index 0000000..cad5e03
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn01.grid.linkedin.com:8888/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
new file mode 100644
index 0000000..eeb7980
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-2
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn02.grid.linkedin.com:8888/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
new file mode 100644
index 0000000..61135ba
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-3
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn03.grid.linkedin.com:8888/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
new file mode 100644
index 0000000..a772f1c
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-4
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn04.grid.linkedin.com:8888/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
new file mode 100644
index 0000000..6683221
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=LocalFS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.LocalFSDataNode
+data.node.fs.uri=file:///
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
new file mode 100644
index 0000000..bcf6d44
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-1
+flow.edge.destination=HDFS-1
+flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
new file mode 100644
index 0000000..99d1ed7
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
@@ -0,0 +1,10 @@
+flow.edge.source=HDFS-1
+flow.edge.destination=HDFS-3
+flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
new file mode 100644
index 0000000..537cbfa
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-2
+flow.edge.destination=HDFS-2
+flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
new file mode 100644
index 0000000..6ec2ea5
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-2
+flow.edge.destination=HDFS-4
+flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
new file mode 100644
index 0000000..ed6e899
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
@@ -0,0 +1,13 @@
+flow.edge.source=HDFS-3
+flow.edge.destination=ADLS-1
+flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+
+# Proxy config
+flow.edge.proxy.host=adl-proxy.linkedin.com
+flow.edge.proxy.port=1234
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
new file mode 100644
index 0000000..eae2767
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
@@ -0,0 +1,13 @@
+flow.edge.source=HDFS-4
+flow.edge.destination=ADLS-1
+flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+
+# Proxy config
+flow.edge.proxy.host=adl-proxy.linkedin.com
+flow.edge.proxy.port=1234
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
new file mode 100644
index 0000000..268b67f
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
@@ -0,0 +1,9 @@
+flow.edge.source=LocalFS-1
+flow.edge.destination=HDFS-1
+flow.edge.id=LocalFS-1:HDFS-1:localToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=fs:///
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
+flow.edge.specExecutors.0.specExecInstance.job.type=java
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
new file mode 100644
index 0000000..bc67810
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
@@ -0,0 +1,9 @@
+flow.edge.source=LocalFS-1
+flow.edge.destination=HDFS-2
+flow.edge.id=LocalFS-1:HDFS-2:localToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=fs:///
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
+flow.edge.specExecutors.0.specExecInstance.job.type=java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
deleted file mode 100644
index 43fa9a3..0000000
--- a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.core;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SystemUtils;
-import org.eclipse.jgit.api.Git;
-import org.eclipse.jgit.api.errors.GitAPIException;
-import org.eclipse.jgit.dircache.DirCache;
-import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.lib.RepositoryCache;
-import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.util.FS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-
-
-public class GitFlowGraphMonitorTest {
- private static final Logger logger = LoggerFactory.getLogger(GitFlowGraphMonitor.class);
- private Repository remoteRepo;
- private Git gitForPush;
- private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir";
- private final File remoteDir = new File(TEST_DIR + "/remote");
- private final File cloneDir = new File(TEST_DIR + "/clone");
- private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
- private static final String NODE_1_FILE = "node1.properties";
- private final File node1Dir = new File(flowGraphDir, "node1");
- private final File node1File = new File(node1Dir, NODE_1_FILE);
- private static final String NODE_2_FILE = "node2.properties";
- private final File node2Dir = new File(flowGraphDir, "node2");
- private final File node2File = new File(node2Dir, NODE_2_FILE);
- private final File edge1Dir = new File(node1Dir, "node2");
- private final File edge1File = new File(edge1Dir, "edge1.properties");
-
- private RefSpec masterRefSpec = new RefSpec("master");
- private FSFlowCatalog flowCatalog;
- private Config config;
- private BaseFlowGraph flowGraph;
- private GitFlowGraphMonitor gitFlowGraphMonitor;
-
- @BeforeClass
- public void setUp() throws Exception {
- cleanUpDir(TEST_DIR);
-
- // Create a bare repository
- RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED);
- this.remoteRepo = fileKey.open(false);
- this.remoteRepo.create(true);
-
- this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
-
- // push an empty commit as a base for detecting changes
- this.gitForPush.commit().setMessage("First commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.config = ConfigBuilder.create()
- .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
- + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
- .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph")
- .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
- .build();
-
- // Create a FSFlowCatalog instance
- URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
- Properties properties = new Properties();
- properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
- Config config = ConfigFactory.parseProperties(properties);
- Config templateCatalogCfg = config
- .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
-
- //Create a FlowGraph instance with defaults
- this.flowGraph = new BaseFlowGraph();
-
- this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph);
- this.gitFlowGraphMonitor.setActive(true);
- }
-
- private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, String paramValue)
- throws IOException, GitAPIException {
- // push a new node file
- nodeDir.mkdirs();
- nodeFile.createNewFile();
- Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8);
-
- // add, commit, push node
- this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call();
- this.gitForPush.commit().setMessage("Node commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.gitFlowGraphMonitor.processGitConfigChanges();
-
- //Check if node1 has been added to the FlowGraph
- DataNode dataNode = this.flowGraph.getNode(nodeId);
- Assert.assertEquals(dataNode.getId(), nodeId);
- Assert.assertTrue(dataNode.isActive());
- Assert.assertEquals(dataNode.getProps().getString("param1"), paramValue);
- }
-
- @Test
- public void testAddNode()
- throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
- testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1");
- testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2");
- }
-
- @Test (dependsOnMethods = "testAddNode")
- public void testAddEdge()
- throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
- // push a new node file
- this.edge1Dir.mkdirs();
- this.edge1File.createNewFile();
-
- Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8);
-
- // add, commit, push
- this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
- this.gitForPush.commit().setMessage("Edge commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.gitFlowGraphMonitor.processGitConfigChanges();
-
- //Check if edge1 has been added to the FlowGraph
- Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
- Assert.assertEquals(edgeSet.size(), 1);
- FlowEdge flowEdge = edgeSet.iterator().next();
- Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
- Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
- }
-
- @Test (dependsOnMethods = "testAddNode")
- public void testUpdateEdge()
- throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
- //Update edge1 file
- Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n"
- + "key1=value1\n", edge1File, Charsets.UTF_8);
-
- // add, commit, push
- this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
- this.gitForPush.commit().setMessage("Edge commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.gitFlowGraphMonitor.processGitConfigChanges();
-
- //Check if new edge1 has been added to the FlowGraph
- Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
- Assert.assertEquals(edgeSet.size(), 1);
- FlowEdge flowEdge = edgeSet.iterator().next();
- Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
- Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
- Assert.assertEquals(flowEdge.getProps().getString("key1"), "value1");
- }
-
- @Test (dependsOnMethods = "testUpdateEdge")
- public void testUpdateNode()
- throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
- //Update param1 value in node1 and check if updated node is added to the graph
- testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3");
- }
-
-
- @Test (dependsOnMethods = "testUpdateNode")
- public void testRemoveEdge() throws GitAPIException, IOException {
- // delete a config file
- edge1File.delete();
-
- //Node1 has 1 edge before delete
- Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
- Assert.assertEquals(edgeSet.size(), 1);
-
- // delete, commit, push
- DirCache ac = this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
- this.edge1Dir.getName(), this.edge1File.getName())).call();
- RevCommit cc = this.gitForPush.commit().setMessage("Edge remove commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.gitFlowGraphMonitor.processGitConfigChanges();
-
- //Check if edge1 has been deleted from the graph
- edgeSet = this.flowGraph.getEdges("node1");
- Assert.assertTrue(edgeSet.size() == 0);
- }
-
- @Test (dependsOnMethods = "testRemoveEdge")
- public void testRemoveNode() throws GitAPIException, IOException {
- //delete node file
- node1File.delete();
-
- //node1 is present in the graph before delete
- DataNode node1 = this.flowGraph.getNode("node1");
- Assert.assertNotNull(node1);
-
- // delete, commit, push
- DirCache ac = this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call();
- RevCommit cc = this.gitForPush.commit().setMessage("Node remove commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.gitFlowGraphMonitor.processGitConfigChanges();
-
- //Check if node1 has been deleted from the graph
- node1 = this.flowGraph.getNode("node1");
- Assert.assertNull(node1);
- }
-
-
- private void cleanUpDir(String dir) {
- File specStoreDir = new File(dir);
-
- // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
- for (int i = 0; i < 5; i++) {
- try {
- if (specStoreDir.exists()) {
- FileUtils.deleteDirectory(specStoreDir);
- }
- // if delete succeeded then break out of loop
- break;
- } catch (IOException e) {
- logger.warn("Cleanup delete directory failed for directory: " + dir, e);
- }
- }
- }
-
- private String formNodeFilePath(String groupDir, String fileName) {
- return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
- }
-
- private String formEdgeFilePath(String parentDir, String groupDir, String fileName) {
- return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
- }
-
- @AfterClass
- public void tearDown() throws Exception {
- cleanUpDir(TEST_DIR);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
deleted file mode 100644
index 9dd51a0..0000000
--- a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-import java.util.Properties;
-
-import org.apache.gobblin.util.ConfigUtils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class BaseFlowEdgeFactoryTest {
- @Test
- public void testCreateFlowEdge() throws Exception {
- Properties properties = new Properties();
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"node1");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "node2");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "FS:///test-template/flow.conf");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2");
-
- FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory();
-
- Properties props = new Properties();
- URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
- props.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
- Config config = ConfigFactory.parseProperties(props);
- Config templateCatalogCfg = config
- .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
- Config edgeProps = ConfigUtils.propertiesToConfig(properties);
- FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog);
- Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
- Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),"s1:d1");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),"/tmp2");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),"s2:d2");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(),"InMemorySpecExecutor");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(),"InMemorySpecExecutor");
- }
-}
\ No newline at end of file