You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/09 16:40:47 UTC
[1/2] incubator-gobblin git commit: [GOBBLIN-554] Change signature of
SpecCompiler#compileFlow() to return a DAG of JobSpecs instead of a HashMap.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 25530f075 -> ef26d287d
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
index ed6e899..68ca0ca 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
@@ -2,7 +2,7 @@ flow.edge.source=HDFS-3
flow.edge.destination=ADLS-1
flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
index eae2767..04f0885 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
@@ -2,7 +2,7 @@ flow.edge.source=HDFS-4
flow.edge.destination=ADLS-1
flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
[2/2] incubator-gobblin git commit: [GOBBLIN-554] Change signature of
SpecCompiler#compileFlow() to return a DAG of JobSpecs instead of a HashMap.
Posted by hu...@apache.org.
[GOBBLIN-554] Change signature of SpecCompiler#compileFlow() to return a DAG of JobSpecs instead of a HashMap.
Closes #2415 from sv2000/dag1
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ef26d287
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ef26d287
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ef26d287
Branch: refs/heads/master
Commit: ef26d287d95edcc5208daed651e8ef8e09efa5d0
Parents: 25530f0
Author: suvasude <su...@linkedin.biz>
Authored: Thu Aug 9 09:40:43 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Aug 9 09:40:43 2018 -0700
----------------------------------------------------------------------
.../gobblin/runtime/api/SpecCompiler.java | 44 --
.../modules/flow/BaseFlowToJobSpecCompiler.java | 6 +-
.../flow/IdentityFlowToJobSpecCompiler.java | 71 ++--
.../modules/flow/MultiHopFlowCompiler.java | 39 +-
.../flow/MultiHopsFlowToJobSpecCompiler.java | 13 +-
.../service/modules/flow/SpecCompiler.java | 51 +++
.../modules/orchestration/Orchestrator.java | 31 +-
.../core/IdentityFlowToJobSpecCompilerTest.java | 40 +-
.../modules/flow/FlowGraphPathFinderTest.java | 417 ------------------
.../modules/flow/MultiHopFlowCompilerTest.java | 419 +++++++++++++++++++
.../hdfs-1-to-hdfs-1-encrypt.properties | 2 +-
.../flowedges/hdfs-1-to-hdfs-3.properties | 2 +-
.../hdfs-2-to-hdfs-2-encrypt.properties | 2 +-
.../flowedges/hdfs-2-to-hdfs-4.properties | 2 +-
.../flowedges/hdfs-3-to-adls-1.properties | 2 +-
.../flowedges/hdfs-4-to-adls-1.properties | 2 +-
16 files changed, 593 insertions(+), 550 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java
deleted file mode 100644
index aceb5dd..0000000
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.api;
-
-import java.net.URI;
-import java.util.Map;
-
-import org.apache.gobblin.instrumented.Instrumentable;
-
-/***
- * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
- * and the mapping to {@link SpecExecutor} that they can be run on.
- */
-public interface SpecCompiler extends SpecCatalogListener, Instrumentable {
- /***
- * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
- * and the mapping to {@link SpecExecutor} that they can be run on.
- * @param spec {@link Spec} to compile.
- * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}.
- */
- Map<Spec, SpecExecutor> compileFlow(Spec spec);
-
- /***
- * Map of {@link Spec} URI and {@link TopologySpec} the {@link SpecCompiler}
- * is aware about.
- * @return Map of {@link Spec} URI and {@link TopologySpec}
- */
- Map<URI, TopologySpec> getTopologySpecMap();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index e4c9ae3..6604676 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -40,20 +40,20 @@ import org.slf4j.LoggerFactory;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecCompiler;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.ServiceMetricNames;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
@@ -207,7 +207,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
return this.topologySpecMap;
}
- public abstract Map<Spec, SpecExecutor> compileFlow(Spec spec);
+ public abstract Dag<JobExecutionPlan> compileFlow(Spec spec);
/**
* Naive implementation of generating jobSpec, which fetch the first available template,
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
index 3fb20a2..fa843c5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
@@ -17,27 +17,30 @@
package org.apache.gobblin.service.modules.flow;
-
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-
import java.util.concurrent.TimeUnit;
+
import org.slf4j.Logger;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.ServiceNode;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
/***
@@ -64,12 +67,11 @@ public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
}
@Override
- public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+ public Dag<JobExecutionPlan> compileFlow(Spec spec) {
Preconditions.checkNotNull(spec);
Preconditions.checkArgument(spec instanceof FlowSpec, "IdentityFlowToJobSpecCompiler only converts FlowSpec to JobSpec");
long startTime = System.nanoTime();
- Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap();
FlowSpec flowSpec = (FlowSpec) spec;
String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
@@ -77,33 +79,44 @@ public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
JobSpec jobSpec = jobSpecGenerator(flowSpec);
+ Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
+ Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ List<JobExecutionPlan> jobExecutionPlans;
+ try {
+ jobExecutionPlans = getJobExecutionPlans(source, destination, jobSpec);
+ } catch (InterruptedException | ExecutionException e) {
+ Instrumented.markMeter(this.flowCompilationFailedMeter);
+ throw new RuntimeException("Cannot determine topology capabilities", e);
+ }
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+ }
+
+ private List<JobExecutionPlan> getJobExecutionPlans(String source, String destination, JobSpec jobSpec)
+ throws ExecutionException, InterruptedException {
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
for (TopologySpec topologySpec : topologySpecMap.values()) {
- try {
- Map<ServiceNode, ServiceNode> capabilities = (Map<ServiceNode, ServiceNode>) topologySpec.getSpecExecutor().getCapabilities().get();
- for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) {
- log.info(String.format("Evaluating current JobSpec: %s against TopologySpec: %s with "
- + "capability of source: %s and destination: %s ", jobSpec.getUri(),
- topologySpec.getUri(), capability.getKey(), capability.getValue()));
- if (source.equals(capability.getKey().getNodeName()) && destination.equals(capability.getValue().getNodeName())) {
- specExecutorMap.put(jobSpec, topologySpec.getSpecExecutor());
- log.info(String.format("Current JobSpec: %s is executable on TopologySpec: %s. Added TopologySpec as candidate.",
- jobSpec.getUri(), topologySpec.getUri()));
-
- log.info("Since we found a candidate executor, we will not try to compute more. "
- + "(Intended limitation for IdentityFlowToJobSpecCompiler)");
- return specExecutorMap;
- }
+ Map<ServiceNode, ServiceNode> capabilities =
+ (Map<ServiceNode, ServiceNode>) topologySpec.getSpecExecutor().getCapabilities().get();
+ for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) {
+ log.info(String.format("Evaluating current JobSpec: %s against TopologySpec: %s with "
+ + "capability of source: %s and destination: %s ", jobSpec.getUri(), topologySpec.getUri(),
+ capability.getKey(), capability.getValue()));
+ if (source.equals(capability.getKey().getNodeName()) && destination
+ .equals(capability.getValue().getNodeName())) {
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec, topologySpec.getSpecExecutor());
+ log.info(String
+ .format("Current JobSpec: %s is executable on TopologySpec: %s. Added TopologySpec as candidate.",
+ jobSpec.getUri(), topologySpec.getUri()));
+
+ log.info("Since we found a candidate executor, we will not try to compute more. "
+ + "(Intended limitation for IdentityFlowToJobSpecCompiler)");
+ jobExecutionPlans.add(jobExecutionPlan);
+ return jobExecutionPlans;
}
- } catch (InterruptedException | ExecutionException e) {
- Instrumented.markMeter(this.flowCompilationFailedMeter);
- throw new RuntimeException("Cannot determine topology capabilities", e);
}
}
- Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
- Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
- return specExecutorMap;
+ return jobExecutionPlans;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 8b14b10..a281aa4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -41,12 +42,14 @@ import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
@@ -90,53 +93,50 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph);
}
+ @VisibleForTesting
+ MultiHopFlowCompiler(Config config, FlowGraph flowGraph) {
+ super(config, Optional.absent(), true);
+ this.flowGraph = flowGraph;
+ }
+
public void setActive(boolean active) {
this.active = active;
this.gitFlowGraphMonitor.setActive(active);
}
/**
- * TODO: We need to change signature of compileFlow to return a Dag instead of a HashMap to capture
- * job dependencies.
- * @param spec
- * @return
+ * j
+ * @param spec an instance of {@link FlowSpec}.
+ * @return A DAG of {@link JobExecutionPlan}s, which encapsulates the compiled {@link org.apache.gobblin.runtime.api.JobSpec}s
+ * together with the {@link SpecExecutor} where the job can be executed.
*/
@Override
- public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+ public Dag<JobExecutionPlan> compileFlow(Spec spec) {
Preconditions.checkNotNull(spec);
- Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowToJobSpecCompiler only accepts FlowSpecs");
+ Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowCompiler only accepts FlowSpecs");
long startTime = System.nanoTime();
- Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap();
FlowSpec flowSpec = (FlowSpec) spec;
String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
+ Dag<JobExecutionPlan> jobExecutionPlanDag;
FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec);
try {
//Compute the path from source to destination.
FlowGraphPath flowGraphPath = pathFinder.findPath();
//Convert the path into a Dag of JobExecutionPlans.
- Dag<JobExecutionPlan> jobExecutionPlanDag;
if (flowGraphPath != null) {
jobExecutionPlanDag = flowGraphPath.asDag();
} else {
Instrumented.markMeter(this.flowCompilationFailedMeter);
log.info(String.format("No path found from source: %s and destination: %s", source, destination));
- return null;
+ return new JobExecutionPlanDagFactory().createDag(new ArrayList<>());
}
-
- //TODO: Just a dummy return value for now. compileFlow() signature needs to be modified to return a Dag instead
- // of a Map. For now just add all specs into the map.
- for (Dag.DagNode<JobExecutionPlan> node: jobExecutionPlanDag.getNodes()) {
- JobExecutionPlan jobExecutionPlan = node.getValue();
- specExecutorMap.put(jobExecutionPlan.getJobSpec(), jobExecutionPlan.getSpecExecutor());
- }
- } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | IOException
- | URISyntaxException e) {
+ } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | IOException e) {
Instrumented.markMeter(this.flowCompilationFailedMeter);
log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e);
return null;
@@ -144,7 +144,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- return specExecutorMap;
+ return jobExecutionPlanDag;
}
@Override
@@ -153,5 +153,4 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
return;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
index 544ca42..236f927 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.flow;
import com.google.common.base.Splitter;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -35,7 +36,10 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.policy.ServicePolicy;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.jgrapht.graph.DirectedWeightedMultigraph;
@@ -132,11 +136,16 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
}
@Override
- public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+ public Dag<JobExecutionPlan> compileFlow(Spec spec) {
// A Map from JobSpec to SpexExecutor, as the output of Flow Compiler.
Map<Spec, SpecExecutor> specExecutorInstanceMap = Maps.newLinkedHashMap();
findPath(specExecutorInstanceMap, spec);
- return specExecutorInstanceMap;
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+ for (Map.Entry<Spec, SpecExecutor> entry: specExecutorInstanceMap.entrySet()) {
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan((JobSpec) entry.getKey(), entry.getValue());
+ jobExecutionPlans.add(jobExecutionPlan);
+ }
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
new file mode 100644
index 0000000..3ce3b70
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.gobblin.instrumented.Instrumentable;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/***
+ * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
+ * and the mapping to {@link SpecExecutor} that they can be run on.
+ */
+public interface SpecCompiler extends SpecCatalogListener, Instrumentable {
+ /***
+ * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
+ * and the mapping to {@link SpecExecutor} that they can be run on.
+ * @param spec {@link Spec} to compile.
+ * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}.
+ */
+ Dag<JobExecutionPlan> compileFlow(Spec spec);
+
+ /***
+ * Map of {@link Spec} URI and {@link TopologySpec} the {@link SpecCompiler}
+ * is aware about.
+ * @return Map of {@link Spec} URI and {@link TopologySpec}
+ */
+ Map<URI, TopologySpec> getTopologySpecMap();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 1b3907d..4959b1a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -38,7 +38,7 @@ import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.SpecCompiler;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
@@ -46,6 +46,8 @@ import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.ServiceMetricNames;
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.slf4j.LoggerFactory;
@@ -186,25 +188,27 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
long startTime = System.nanoTime();
if (spec instanceof FlowSpec) {
- Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec);
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
- if (specExecutorInstanceMap.isEmpty()) {
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
_log.warn("Cannot determine an executor to run on for Spec: " + spec);
return;
}
// Schedule all compiled JobSpecs on their respective Executor
- for (Map.Entry<Spec, SpecExecutor> specsToExecute : specExecutorInstanceMap.entrySet()) {
+ for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+
// Run this spec on selected executor
SpecProducer producer = null;
try {
- producer = specsToExecute.getValue().getProducer().get();
- Spec jobSpec = specsToExecute.getKey();
+ producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
+ Spec jobSpec = jobExecutionPlan.getJobSpec();
_log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer));
producer.addSpec(jobSpec);
} catch(Exception e) {
- _log.error("Cannot successfully setup spec: " + specsToExecute.getKey() + " on executor: " + producer +
+ _log.error("Cannot successfully setup spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer +
" for flow: " + spec, e);
}
}
@@ -221,25 +225,26 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all executions
if (spec instanceof FlowSpec) {
- Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec);
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
- if (specExecutorInstanceMap.isEmpty()) {
+ if (jobExecutionPlanDag.isEmpty()) {
_log.warn("Cannot determine an executor to delete Spec: " + spec);
return;
}
// Delete all compiled JobSpecs on their respective Executor
- for (Map.Entry<Spec, SpecExecutor> specsToDelete : specExecutorInstanceMap.entrySet()) {
+ for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan = dagNode.getValue();
// Delete this spec on selected executor
SpecProducer producer = null;
try {
- producer = specsToDelete.getValue().getProducer().get();
- Spec jobSpec = specsToDelete.getKey();
+ producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
+ Spec jobSpec = jobExecutionPlan.getJobSpec();
_log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
producer.deleteSpec(jobSpec.getUri(), headers);
} catch(Exception e) {
- _log.error("Cannot successfully delete spec: " + specsToDelete.getKey() + " on executor: " + producer +
+ _log.error("Cannot successfully delete spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer +
" for flow: " + spec, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
index 2dbe790..c1134a8 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
@@ -16,13 +16,11 @@
*/
package org.apache.gobblin.service.modules.core;
-
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
@@ -40,14 +38,15 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
public class IdentityFlowToJobSpecCompilerTest {
private static final Logger logger = LoggerFactory.getLogger(IdentityFlowToJobSpecCompilerTest.class);
@@ -187,14 +186,15 @@ public class IdentityFlowToJobSpecCompilerTest {
FlowSpec flowSpec = initFlowSpec();
// Run compiler on flowSpec
- Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec);
+ Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithTemplateCalague.compileFlow(flowSpec);
// Assert pre-requisites
- Assert.assertNotNull(specExecutorMapping, "Expected non null mapping.");
- Assert.assertTrue(specExecutorMapping.size() == 1, "Exepected 1 executor for FlowSpec.");
+ Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag.");
+ Assert.assertTrue(jobExecutionPlanDag.getNodes().size() == 1, "Exepected 1 executor for FlowSpec.");
// Assert FlowSpec compilation
- Spec spec = specExecutorMapping.keySet().iterator().next();
+ Dag.DagNode<JobExecutionPlan> dagNode = jobExecutionPlanDag.getStartNodes().get(0);
+ Spec spec = dagNode.getValue().getJobSpec();
Assert.assertTrue(spec instanceof JobSpec, "Expected JobSpec compiled from FlowSpec.");
// Assert JobSpec properties
@@ -209,6 +209,9 @@ public class IdentityFlowToJobSpecCompilerTest {
Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME);
Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_FLOW_GROUP);
Assert.assertTrue(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+
+ //Assert the start node has no children.
+ Assert.assertNull(jobExecutionPlanDag.getChildren(dagNode));
}
@Test
@@ -216,14 +219,16 @@ public class IdentityFlowToJobSpecCompilerTest {
FlowSpec flowSpec = initFlowSpec();
// Run compiler on flowSpec
- Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithoutTemplateCalague.compileFlow(flowSpec);
+ Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithoutTemplateCalague.compileFlow(flowSpec);
// Assert pre-requisites
- Assert.assertNotNull(specExecutorMapping, "Expected non null mapping.");
- Assert.assertTrue(specExecutorMapping.size() == 1, "Exepected 1 executor for FlowSpec.");
+ Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag.");
+ Assert.assertTrue(jobExecutionPlanDag.getNodes().size() == 1, "Exepected 1 executor for FlowSpec.");
// Assert FlowSpec compilation
- Spec spec = specExecutorMapping.keySet().iterator().next();
+ Assert.assertEquals(jobExecutionPlanDag.getStartNodes().size(), 1);
+ Dag.DagNode<JobExecutionPlan> dagNode = jobExecutionPlanDag.getStartNodes().get(0);
+ Spec spec = dagNode.getValue().getJobSpec();
Assert.assertTrue(spec instanceof JobSpec, "Expected JobSpec compiled from FlowSpec.");
// Assert JobSpec properties
@@ -238,6 +243,9 @@ public class IdentityFlowToJobSpecCompilerTest {
Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME);
Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_FLOW_GROUP);
Assert.assertTrue(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+
+ //Assert the start node has no children.
+ Assert.assertNull(jobExecutionPlanDag.getChildren(dagNode));
}
@Test
@@ -245,10 +253,10 @@ public class IdentityFlowToJobSpecCompilerTest {
FlowSpec flowSpec = initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, "unsupportedSource", "unsupportedSink");
// Run compiler on flowSpec
- Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec);
+ Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithTemplateCalague.compileFlow(flowSpec);
// Assert pre-requisites
- Assert.assertNotNull(specExecutorMapping, "Expected non null mapping.");
- Assert.assertTrue(specExecutorMapping.size() == 0, "Exepected 1 executor for FlowSpec.");
+ Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag.");
+ Assert.assertTrue(jobExecutionPlanDag.getNodes().size() == 0, "Exepected 1 executor for FlowSpec.");
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
deleted file mode 100644
index 5d0500c..0000000
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flow;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Charsets;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import com.typesafe.config.ConfigSyntax;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-import org.apache.gobblin.util.CompletedFuture;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-
-
-@Slf4j
-public class FlowGraphPathFinderTest {
- private FlowGraph flowGraph;
- private FlowGraphPathFinder pathFinder;
-
- @BeforeClass
- public void setUp()
- throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException {
- //Create a FlowGraph
- this.flowGraph = new BaseFlowGraph();
-
- //Add DataNodes to the graph from the node properties files
- URI dataNodesUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI();
- FileSystem fs = FileSystem.get(dataNodesUri, new Configuration());
- Path dataNodesPath = new Path(dataNodesUri);
- ConfigParseOptions options = ConfigParseOptions.defaults()
- .setSyntax(ConfigSyntax.PROPERTIES)
- .setAllowMissing(false);
-
- for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) {
- try (InputStream is = fs.open(fileStatus.getPath())) {
- Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
- Class dataNodeClass = Class.forName(ConfigUtils
- .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
- DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
- this.flowGraph.addDataNode(dataNode);
- }
- }
-
- //Create a FSFlowCatalog instance
- URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
- // Create a FSFlowCatalog instance
- Properties properties = new Properties();
- properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
- Config config = ConfigFactory.parseProperties(properties);
- Config templateCatalogCfg = config
- .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
-
-
- //Add FlowEdges from the edge properties files
- URI flowEdgesURI = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
- fs = FileSystem.get(flowEdgesURI, new Configuration());
- Path flowEdgesPath = new Path(flowEdgesURI);
- for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) {
- log.warn(fileStatus.getPath().toString());
- try (InputStream is = fs.open(fileStatus.getPath())) {
- Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
- Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
- FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
- FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
- FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog);
- this.flowGraph.addFlowEdge(edge);
- }
- }
-
- //Create a flow spec
- Properties flowProperties = new Properties();
- flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
- flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup");
- flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName");
- flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "LocalFS-1");
- flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "ADLS-1");
- Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties);
-
- //Get the input/output dataset config from a file
- URI flowConfigUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flow/flow.conf").toURI();
- Path flowConfigPath = new Path(flowConfigUri);
- FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration());
- try (InputStream is = fs1.open(flowConfigPath)) {
- Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset()));
- flowConfig = flowConfig.withFallback(datasetConfig).resolve();
- }
-
- FlowSpec.Builder flowSpecBuilder = null;
- flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri())
- .withConfig(flowConfig)
- .withDescription("dummy description")
- .withVersion(FlowSpec.Builder.DEFAULT_VERSION);
-
- FlowSpec spec = flowSpecBuilder.build();
- this.pathFinder = new FlowGraphPathFinder(this.flowGraph, spec);
- }
-
- @Test
- public void testFindPath()
- throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
- SpecNotFoundException, IOException {
- Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
- Assert.assertEquals(jobDag.getNodes().size(), 4);
- Assert.assertEquals(jobDag.getStartNodes().size(), 1);
- Assert.assertEquals(jobDag.getEndNodes().size(), 1);
-
- //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1"
- Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
- JobExecutionPlan jobSpecWithExecutor = startNode.getValue();
- JobSpec jobSpec = jobSpecWithExecutor.getJobSpec();
-
- //Ensure the resolved job config for the first hop has the correct substitutions.
- Config jobConfig = jobSpec.getConfig();
- Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
- String from = jobConfig.getString("from");
- String to = jobConfig.getString("to");
- Assert.assertEquals(from, "/data/out/testTeam/testDataset");
- Assert.assertEquals(to, "/data/out/testTeam/testDataset");
- String sourceFsUri = jobConfig.getString("fs.uri");
- Assert.assertEquals(sourceFsUri, "file:///");
- Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
- Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
- String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
- Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/");
- Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
- Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
- Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
- Assert.assertEquals(jobConfig.getString("type"), "java");
- Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
- Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
- //Ensure the spec executor has the correct configurations
- SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor();
- Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
- Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
-
- //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt"
- Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
- Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
- jobSpecWithExecutor = secondHopNode.getValue();
- jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
- Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
- from = jobConfig.getString("from");
- to = jobConfig.getString("to");
- Assert.assertEquals(from, "/data/out/testTeam/testDataset");
- Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
- Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
- Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
- specExecutor = jobSpecWithExecutor.getSpecExecutor();
- Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
- Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
- //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3"
- Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
- Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
- jobSpecWithExecutor = thirdHopNode.getValue();
- jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
- Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
- from = jobConfig.getString("from");
- to = jobConfig.getString("to");
- Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
- Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
- Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/");
- Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
- Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
- Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
- Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
- //Ensure the spec executor has the correct configurations
- specExecutor = jobSpecWithExecutor.getSpecExecutor();
- Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
- Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
- //Get the 4th hop - "Distcp from HDFS3 to ADLS-1"
- Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
- Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
- jobSpecWithExecutor = fourthHopNode.getValue();
- jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
- Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
- from = jobConfig.getString("from");
- to = jobConfig.getString("to");
- Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
- Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
- Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
- Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
- Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
- Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
- Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
- Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
- Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
- Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
- //Ensure the spec executor has the correct configurations
- specExecutor = jobSpecWithExecutor.getSpecExecutor();
- Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443");
- Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
- //Ensure the fourth hop is the last
- Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
- }
-
- @Test (dependsOnMethods = "testFindPath")
- public void testFindPathAfterFirstEdgeDeletion()
- throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
- SpecNotFoundException, IOException {
- //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
- this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
-
- Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
- Assert.assertEquals(jobDag.getNodes().size(), 4);
- Assert.assertEquals(jobDag.getStartNodes().size(), 1);
- Assert.assertEquals(jobDag.getEndNodes().size(), 1);
-
- //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2"
- Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
- JobExecutionPlan jobExecutionPlan = startNode.getValue();
- JobSpec jobSpec = jobExecutionPlan.getJobSpec();
-
- //Ensure the resolved job config for the first hop has the correct substitutions.
- Config jobConfig = jobSpec.getConfig();
- Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
- String from = jobConfig.getString("from");
- String to = jobConfig.getString("to");
- Assert.assertEquals(from, "/data/out/testTeam/testDataset");
- Assert.assertEquals(to, "/data/out/testTeam/testDataset");
- String sourceFsUri = jobConfig.getString("fs.uri");
- Assert.assertEquals(sourceFsUri, "file:///");
- Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
- Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
- String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
- Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/");
- Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
- Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
- Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
- Assert.assertEquals(jobConfig.getString("type"), "java");
- Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
- Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
- //Ensure the spec executor has the correct configurations
- SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
- Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
- Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
-
- //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt"
- Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
- Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
- jobExecutionPlan = secondHopNode.getValue();
- jobConfig = jobExecutionPlan.getJobSpec().getConfig();
- Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
- from = jobConfig.getString("from");
- to = jobConfig.getString("to");
- Assert.assertEquals(from, "/data/out/testTeam/testDataset");
- Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
- Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
- Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
- specExecutor = jobExecutionPlan.getSpecExecutor();
- Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
- Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
- //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4"
- Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
- Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
- jobExecutionPlan = thirdHopNode.getValue();
- jobConfig = jobExecutionPlan.getJobSpec().getConfig();
- Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
- from = jobConfig.getString("from");
- to = jobConfig.getString("to");
- Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
- Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
- Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/");
- Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
- Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
- Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
- Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
- //Ensure the spec executor has the correct configurations
- specExecutor = jobExecutionPlan.getSpecExecutor();
- Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
- Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
- //Get the 4th hop - "Distcp from HDFS4 to ADLS-1"
- Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
- Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
- jobExecutionPlan = fourthHopNode.getValue();
- jobConfig = jobExecutionPlan.getJobSpec().getConfig();
- Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
- from = jobConfig.getString("from");
- to = jobConfig.getString("to");
- Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
- Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
- Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
- Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
- Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
- Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
- Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
- Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
- Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
- Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
- //Ensure the spec executor has the correct configurations
- specExecutor = jobExecutionPlan.getSpecExecutor();
- Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443");
- Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
-
- //Ensure the fourth hop is the last
- Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
- }
-
- @Test (dependsOnMethods = "testFindPathAfterFirstEdgeDeletion")
- public void testFindPathAfterSecondEdgeDeletion()
- throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
- SpecNotFoundException, IOException {
- //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
- this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
-
- //Ensure no path to destination.
- Assert.assertNull(pathFinder.findPath());
- }
-
- @AfterClass
- public void tearDown() {
- }
-
- public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor {
- // Executor Instance
- protected final Config config;
-
- private SpecProducer<Spec> azkabanSpecProducer;
-
- public TestAzkabanSpecExecutor(Config config) {
- super(config);
- this.config = config;
- }
-
- @Override
- protected void startUp() throws Exception {
- //Do nothing
- }
-
- @Override
- protected void shutDown() throws Exception {
- //Do nothing
- }
-
- @Override
- public Future<String> getDescription() {
- return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
- }
-
- @Override
- public Future<? extends SpecProducer> getProducer() {
- return new CompletedFuture<>(this.azkabanSpecProducer, null);
- }
-
- @Override
- public Future<Config> getConfig() {
- return new CompletedFuture<>(config, null);
- }
-
- @Override
- public Future<String> getHealth() {
- return new CompletedFuture<>("Healthy", null);
- }
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
new file mode 100644
index 0000000..b8bac02
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigSyntax;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Slf4j
+public class MultiHopFlowCompilerTest {
+ private FlowGraph flowGraph;
+ private SpecCompiler specCompiler;
+
+ @BeforeClass
+ public void setUp()
+ throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException {
+ //Create a FlowGraph
+ this.flowGraph = new BaseFlowGraph();
+
+ //Add DataNodes to the graph from the node properties files
+ URI dataNodesUri = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI();
+ FileSystem fs = FileSystem.get(dataNodesUri, new Configuration());
+ Path dataNodesPath = new Path(dataNodesUri);
+ ConfigParseOptions options = ConfigParseOptions.defaults()
+ .setSyntax(ConfigSyntax.PROPERTIES)
+ .setAllowMissing(false);
+
+ for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) {
+ try (InputStream is = fs.open(fileStatus.getPath())) {
+ Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+ Class dataNodeClass = Class.forName(ConfigUtils
+ .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+ DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
+ this.flowGraph.addDataNode(dataNode);
+ }
+ }
+
+ //Create a FSFlowCatalog instance
+ URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ // Create a FSFlowCatalog instance
+ Properties properties = new Properties();
+ properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(properties);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+
+ //Add FlowEdges from the edge properties files
+ URI flowEdgesURI = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
+ fs = FileSystem.get(flowEdgesURI, new Configuration());
+ Path flowEdgesPath = new Path(flowEdgesURI);
+ for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) {
+ log.warn(fileStatus.getPath().toString());
+ try (InputStream is = fs.open(fileStatus.getPath())) {
+ Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+ Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+ FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
+ FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
+ FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog);
+ this.flowGraph.addFlowEdge(edge);
+ }
+ }
+
+ this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph);
+ }
+
+ private FlowSpec createFlowSpec(String flowConfigResource, String source, String destination) throws IOException, URISyntaxException {
+ //Create a flow spec
+ Properties flowProperties = new Properties();
+ flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
+ flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup");
+ flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName");
+ flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, source);
+ flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, destination);
+ Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties);
+
+ //Get the input/output dataset config from a file
+ URI flowConfigUri = MultiHopFlowCompilerTest.class.getClassLoader().getResource(flowConfigResource).toURI();
+ Path flowConfigPath = new Path(flowConfigUri);
+ FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration());
+ try (InputStream is = fs1.open(flowConfigPath)) {
+ Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset()));
+ flowConfig = flowConfig.withFallback(datasetConfig).resolve();
+ }
+
+ FlowSpec.Builder flowSpecBuilder = null;
+ flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri())
+ .withConfig(flowConfig)
+ .withDescription("dummy description")
+ .withVersion(FlowSpec.Builder.DEFAULT_VERSION);
+
+ FlowSpec spec = flowSpecBuilder.build();
+ return spec;
+ }
+
+ @Test
+ public void testCompileFlow() throws URISyntaxException, IOException {
+ FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1");
+ Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
+ Assert.assertEquals(jobDag.getNodes().size(), 4);
+ Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+ Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+ //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1"
+ Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+ JobExecutionPlan jobSpecWithExecutor = startNode.getValue();
+ JobSpec jobSpec = jobSpecWithExecutor.getJobSpec();
+
+ //Ensure the resolved job config for the first hop has the correct substitutions.
+ Config jobConfig = jobSpec.getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ String from = jobConfig.getString("from");
+ String to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+ String sourceFsUri = jobConfig.getString("fs.uri");
+ Assert.assertEquals(sourceFsUri, "file:///");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+ Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+ String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+ Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+ Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ Assert.assertEquals(jobConfig.getString("type"), "java");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+ //Ensure the spec executor has the correct configurations
+ SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+ //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt"
+ Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+ jobSpecWithExecutor = secondHopNode.getValue();
+ jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+ //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3"
+ Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+ jobSpecWithExecutor = thirdHopNode.getValue();
+ jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+ //Get the 4th hop - "Distcp from HDFS3 to ADLS-1"
+ Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+ jobSpecWithExecutor = fourthHopNode.getValue();
+ jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+ Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+ Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+ //Ensure the fourth hop is the last
+ Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+ }
+
+ @Test (dependsOnMethods = "testCompileFlow")
+ public void testCompileFlowAfterFirstEdgeDeletion() throws URISyntaxException, IOException {
+ //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
+ this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
+
+ FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1");
+ Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
+
+ Assert.assertEquals(jobDag.getNodes().size(), 4);
+ Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+ Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+ //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2"
+ Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+ JobExecutionPlan jobExecutionPlan = startNode.getValue();
+ JobSpec jobSpec = jobExecutionPlan.getJobSpec();
+
+ //Ensure the resolved job config for the first hop has the correct substitutions.
+ Config jobConfig = jobSpec.getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ String from = jobConfig.getString("from");
+ String to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+ String sourceFsUri = jobConfig.getString("fs.uri");
+ Assert.assertEquals(sourceFsUri, "file:///");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+ Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+ String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+ Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+ Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ Assert.assertEquals(jobConfig.getString("type"), "java");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+ //Ensure the spec executor has the correct configurations
+ SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+ //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt"
+ Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+ jobExecutionPlan = secondHopNode.getValue();
+ jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+ //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4"
+ Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+ jobExecutionPlan = thirdHopNode.getValue();
+ jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+ //Get the 4th hop - "Distcp from HDFS4 to ADLS-1"
+ Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+ jobExecutionPlan = fourthHopNode.getValue();
+ jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+ Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+ Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
+
+ //Ensure the fourth hop is the last
+ Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+ }
+
+ @Test (dependsOnMethods = "testCompileFlowAfterFirstEdgeDeletion")
+ public void testCompileFlowAfterSecondEdgeDeletion() throws URISyntaxException, IOException {
+ //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
+ this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
+
+ FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1");
+ Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
+
+ //Ensure no path to destination.
+ Assert.assertTrue(jobDag.isEmpty());
+ }
+
+ @AfterClass
+ public void tearDown() {
+ }
+
+ public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor {
+ // Executor Instance
+ protected final Config config;
+
+ private SpecProducer<Spec> azkabanSpecProducer;
+
+ public TestAzkabanSpecExecutor(Config config) {
+ super(config);
+ this.config = config;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ //Do nothing
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ //Do nothing
+ }
+
+ @Override
+ public Future<String> getDescription() {
+ return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
+ }
+
+ @Override
+ public Future<? extends SpecProducer> getProducer() {
+ return new CompletedFuture<>(this.azkabanSpecProducer, null);
+ }
+
+ @Override
+ public Future<Config> getConfig() {
+ return new CompletedFuture<>(config, null);
+ }
+
+ @Override
+ public Future<String> getHealth() {
+ return new CompletedFuture<>("Healthy", null);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
index bcf6d44..44d3c44 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
@@ -2,7 +2,7 @@ flow.edge.source=HDFS-1
flow.edge.destination=HDFS-1
flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
index 99d1ed7..897f003 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
@@ -2,7 +2,7 @@ flow.edge.source=HDFS-1
flow.edge.destination=HDFS-3
flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
index 537cbfa..db0ea48 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
@@ -2,7 +2,7 @@ flow.edge.source=HDFS-2
flow.edge.destination=HDFS-2
flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
index 6ec2ea5..44f0408 100644
--- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
@@ -2,7 +2,7 @@ flow.edge.source=HDFS-2
flow.edge.destination=HDFS-4
flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs
flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
-flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE