You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/30 16:57:37 UTC

[1/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS).

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 33d4fea4b -> 22a951f0a


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/flow.conf b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/flow.conf
new file mode 100644
index 0000000..64d6921
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/flow.conf
@@ -0,0 +1,20 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/inbound/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=avro
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
+
+gobblin.flow.edge.input.dataset.descriptor.1.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.1.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.1.path=/data/outbound/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.1.format=avro
+
+gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class}
+gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform}
+gobblin.flow.edge.output.dataset.descriptor.1.path=${gobblin.flow.edge.input.dataset.descriptor.1.path}
+gobblin.flow.edge.output.dataset.descriptor.1.format=${gobblin.flow.edge.input.dataset.descriptor.1.format}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
new file mode 100644
index 0000000..0d4f7c3
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
@@ -0,0 +1 @@
+gobblin.template.uri="resource:///template_catalog/templates/job1.template"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
new file mode 100644
index 0000000..c26ade4
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
@@ -0,0 +1,3 @@
+gobblin.template.uri="resource:///template_catalog/templates/job2.template"
+
+dependencies=job1

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
new file mode 100644
index 0000000..cac20ed
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
@@ -0,0 +1,2 @@
+gobblin.template.uri="resource:///template_catalog/templates/job3.template"
+dependencies=job1

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
new file mode 100644
index 0000000..9b86c77
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
@@ -0,0 +1,2 @@
+gobblin.template.uri="resource:///template_catalog/templates/job4.template"
+dependencies="job2,job3"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
new file mode 100644
index 0000000..0a53e5b
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
@@ -0,0 +1,18 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=avro
+#############################################################
+# Define input dataset to be uncompressed and unencrypted
+#############################################################
+gobblin.flow.edge.output.dataset.descriptor.0.codec=NONE
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.algorithm=NONE
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.edge.output.dataset.descriptor.0.format=json
+gobblin.flow.edge.output.dataset.descriptor.0.codec=gzip
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.algorithm=aes_rotating
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_type=json
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_encoding=base64
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
new file mode 100644
index 0000000..cda75cf
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
@@ -0,0 +1 @@
+gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
new file mode 100644
index 0000000..2cbf420
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
@@ -0,0 +1,18 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=json
+gobblin.flow.edge.input.dataset.descriptor.0.codec=gzip
+gobblin.flow.edge.input.dataset.descriptor.0.encrypt.algorithm=aes_rotating
+gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_type=json
+gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_encoding=base64
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.output.dataset.descriptor.0.platform=adls
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
+gobblin.flow.edge.output.dataset.descriptor.0.codec=${gobblin.flow.edge.input.dataset.descriptor.0.codec}
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.algorithm=${gobblin.flow.edge.input.dataset.descriptor.0.encrypt.algorithm}
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_type=${gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_type}
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_encoding=${gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_encoding}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
new file mode 100644
index 0000000..37d0d9c
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
@@ -0,0 +1 @@
+gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
new file mode 100644
index 0000000..abac6b5
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
@@ -0,0 +1,15 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+
+gobblin.flow.edge.input.dataset.descriptor.1.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.1.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.1.path=/data/encrypted/${team.name}/${dataset.name}
+
+gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class}
+gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform}
+gobblin.flow.edge.output.dataset.descriptor.1.path=${gobblin.flow.edge.input.dataset.descriptor.1.path}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
new file mode 100644
index 0000000..fe627c9
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
@@ -0,0 +1 @@
+gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp.template"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
new file mode 100644
index 0000000..d0765e1
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
@@ -0,0 +1,9 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=avro
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
new file mode 100644
index 0000000..fe627c9
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
@@ -0,0 +1 @@
+gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp.template"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
new file mode 100644
index 0000000..b92a2bf
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
@@ -0,0 +1,65 @@
+# ====================================================================
+# Job configurations
+# ====================================================================
+job.name=Distcp-HDFS-ADL
+
+#team.name and dataset.name to be supplied via flowConfig
+from=/data/encrypted/${team.name}/${dataset.name}
+to=/data/encrypted/${team.name}/${dataset.name}
+
+#Will delete files in target(ADL) if not exist in source
+gobblin.copy.recursive.update=true
+gobblin.copy.recursive.delete=true
+gobblin.copy.recursive.deleteEmptyDirectories=true
+gobblin.trash.skip.trash=true
+
+#Will make the job fail if there's any failure
+gobblin.copy.abortOnSingleDatasetFailure=true
+
+#gobblin.copy.preserved.attributes=p
+
+#Job properties to be resolved from source and dest data node config.
+fs.uri=${source.data.node.fs.uri}
+source.filebased.fs.uri=${fs.uri}
+state.store.fs.uri=${fs.uri}
+target.filebased.fs.uri=${destination.data.node.fs.uri}
+writer.fs.uri=${target.filebased.fs.uri}
+
+#ADL parameters
+fs.AbstractFileSystem.adl.impl="org.apache.hadoop.fs.adl.Adl"
+dfs.adls.oauth2.access.token.provider.type=ClientCredential
+dfs.adls.oauth2.refresh.url="https://login.microsoftonline.com/67893-erty-1234-7678-123456/oauth2/token"
+dfs.adls.oauth2.client.id=${adls.oauth2.client.id}
+writer.encrypted.dfs.adls.oauth2.credential=${adls.ouath2.credential}
+
+encrypt.key.loc=/user/${user.to.proxy}/master.password
+work.dir=/tmp/${user.to.proxy}
+writer.user.to.proxy=${adls.user.to.proxy}
+
+# ====================================================================
+# Distcp configurations
+# ====================================================================
+extract.namespace="gobblin.copy"
+
+gobblin.dataset.profile.class="org.apache.gobblin.data.management.copy.CopyableGlobDatasetFinder"
+
+# target location for copy
+data.publisher.final.dir=${to}
+gobblin.dataset.pattern=${from}
+
+data.publisher.type="org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher"
+source.class="org.apache.gobblin.data.management.copy.CopySource"
+writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder"
+converter.classes="org.apache.gobblin.converter.IdentityConverter"
+
+# =======================================
+# Job Parameters to be resolved using SpecExecutor properties
+# =======================================
+type=${specExecInstance.job.type}
+
+job.jars="lib/*"
+job.lock.enabled=false
+job.class=${specExecInstance.job.launcher.class}
+
+# Gobblin Hadoop Parameters
+launcher.type=${specExecInstance.job.launcher.type}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
new file mode 100644
index 0000000..844dc92
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
@@ -0,0 +1,57 @@
+# ====================================================================
+# Job configurations
+# ====================================================================
+job.name=Distcp-HDFS-HDFS
+
+# Source and destination paths to be obtained from flow config.
+from=${gobblin.flow.edge.input.dataset.descriptor.path}
+to=${gobblin.flow.edge.output.dataset.descriptor.path}
+
+#Will delete files in target if not exist in source
+gobblin.copy.recursive.update=true
+gobblin.copy.recursive.delete=true
+gobblin.copy.recursive.deleteEmptyDirectories=true
+gobblin.trash.skip.trash=true
+
+#Will make the job fail if there's any failure
+gobblin.copy.abortOnSingleDatasetFailure=true
+
+#gobblin.copy.preserved.attributes=p
+
+#Job properties to be resolved from source and dest data node config.
+fs.uri=${source.data.node.fs.uri}
+source.filebased.fs.uri=${fs.uri}
+state.store.fs.uri=${fs.uri}
+target.filebased.fs.uri=${destination.data.node.fs.uri}
+writer.fs.uri=${target.filebased.fs.uri}
+
+work.dir=/tmp/${user.to.proxy}
+writer.user.to.proxy=${adls.user.to.proxy}
+
+# ====================================================================
+# Distcp configurations
+# ====================================================================
+extract.namespace="gobblin.copy"
+
+gobblin.dataset.profile.class="org.apache.gobblin.data.management.copy.CopyableGlobDatasetFinder"
+
+# target location for copy
+data.publisher.final.dir=${to}
+gobblin.dataset.pattern=${from}
+
+data.publisher.type="org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher"
+source.class="org.apache.gobblin.data.management.copy.CopySource"
+writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder"
+converter.classes="org.apache.gobblin.converter.IdentityConverter"
+
+# =======================================
+# Job Parameters to be resolved using SpecExecutor properties
+# =======================================
+type=${specExecInstance.job.type}
+
+job.jars="lib/*"
+job.lock.enabled=false
+job.class=${specExecInstance.job.launcher.class}
+
+# Gobblin Hadoop Parameters
+launcher.type=${specExecInstance.job.launcher.type}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template
new file mode 100644
index 0000000..fcc78cd
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template
@@ -0,0 +1,42 @@
+# ====================================================================
+# Job configurations (can be changed)
+# ====================================================================
+job.name=convert-to-json-and-encrypt
+job.description="Convert date partitioned avro files to json and encrypt"
+from=/data/out/${team.name}/${dataset.name}
+to=/data/encrypted/${team.name}/${dataset.name}
+
+# ====================================================================
+# Distcp configurations
+# ====================================================================
+
+source.class="org.apache.gobblin.source.DatePartitionedAvroFileSource"
+date.partitioned.source.partition.pattern=yyyy-MM-dd
+date.partitioned.source.min.watermark.value=2017-03-01
+source.filebased.data.directory=${from}
+source.entity=avro
+
+converter.classes="org.apache.gobblin.converter.avro.AvroToJsonStringConverter,org.apache.gobblin.converter.string.StringToBytesConverter"
+
+writer.builder.class="org.apache.gobblin.writer.SimpleDataWriterBuilder"
+writer.output.format=json
+writer.codec.type=gzip
+simple.writer.prepend.size=false
+writer.partitioner.class="org.apache.gobblin.writer.partitioner.WorkUnitStateWriterPartitioner"
+writer.partition.pattern=${date.partitioned.source.partition.pattern}
+
+writer.encrypt.algorithm=aes_rotating
+writer.encrypt.keystore_type=json
+writer.encrypt.keystore_path="hdfs://path/to/keystore/keystore.json"
+
+data.publisher.type="org.apache.gobblin.publisher.BaseDataPublisher"
+data.publisher.appendExtractToFinalDir=false
+data.publisher.metadata.output_file="metadata.json"
+data.publisher.metadata.publish.writer=true
+
+data.publisher.final.dir=${to}
+
+task.maxretries=0
+workunit.retry.enabled=false
+
+qualitychecker.task.policies="org.apache.gobblin.policies.count.RowCountPolicy"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/templates/job1.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job1.template b/gobblin-service/src/test/resources/template_catalog/templates/job1.template
new file mode 100644
index 0000000..321e984
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/templates/job1.template
@@ -0,0 +1,2 @@
+key11=val11
+key12=val12
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/templates/job2.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job2.template b/gobblin-service/src/test/resources/template_catalog/templates/job2.template
new file mode 100644
index 0000000..5141d92
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/templates/job2.template
@@ -0,0 +1,2 @@
+key21=val21
+key22=val22
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/templates/job3.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job3.template b/gobblin-service/src/test/resources/template_catalog/templates/job3.template
new file mode 100644
index 0000000..c192cc4
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/templates/job3.template
@@ -0,0 +1,2 @@
+key31=val31
+key32=val32
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/templates/job4.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job4.template b/gobblin-service/src/test/resources/template_catalog/templates/job4.template
new file mode 100644
index 0000000..a6a508e
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/templates/job4.template
@@ -0,0 +1,2 @@
+key41=val41
+key42=val42
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/flow.conf b/gobblin-service/src/test/resources/template_catalog/test-template/flow.conf
index a13af7d..85e686b 100644
--- a/gobblin-service/src/test/resources/template_catalog/test-template/flow.conf
+++ b/gobblin-service/src/test/resources/template_catalog/test-template/flow.conf
@@ -1,16 +1,22 @@
-gobblin.flow.dataset.descriptor.input.0.class=org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor
-gobblin.flow.dataset.descriptor.input.0.path=/data/inbound/<TEAM_NAME>/<DATASET_NAME>
-gobblin.flow.dataset.descriptor.input.0.format=avro
+team.name=test-team
+dataset.name=test-dataset
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/inbound/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=avro
 
-gobblin.flow.dataset.descriptor.output.0.class=${gobblin.flow.dataset.descriptor.input.0.class}
-gobblin.flow.dataset.descriptor.output.0.path=${gobblin.flow.dataset.descriptor.input.0.path}
-gobblin.flow.dataset.descriptor.output.0.format=${gobblin.flow.dataset.descriptor.input.0.format}
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
 
-gobblin.flow.dataset.descriptor.input.1.class=org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor
-gobblin.flow.dataset.descriptor.input.1.path=/data/outbound/<TEAM_NAME>/<DATASET_NAME>
-gobblin.flow.dataset.descriptor.input.1.format=avro
+gobblin.flow.edge.input.dataset.descriptor.1.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.1.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.1.path=/data/outbound/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.1.format=avro
 
-gobblin.flow.dataset.descriptor.output.1.class=${gobblin.flow.dataset.descriptor.input.1.class}
-gobblin.flow.dataset.descriptor.output.1.path=${gobblin.flow.dataset.descriptor.input.1.path}
-gobblin.flow.dataset.descriptor.output.1.format=${gobblin.flow.dataset.descriptor.input.1.format}
+gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class}
+gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform}
+gobblin.flow.edge.output.dataset.descriptor.1.path=${gobblin.flow.edge.input.dataset.descriptor.1.path}
+gobblin.flow.edge.output.dataset.descriptor.1.format=${gobblin.flow.edge.input.dataset.descriptor.1.format}
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.conf b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.conf
deleted file mode 100644
index 4a59fcc..0000000
--- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.conf
+++ /dev/null
@@ -1,2 +0,0 @@
-key11=val11
-key12=val12

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.job
new file mode 100644
index 0000000..00e274e
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.job
@@ -0,0 +1 @@
+gobblin.template.uri=resource:///template_catalog/templates/job1.template

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.conf b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.conf
deleted file mode 100644
index f174940..0000000
--- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-dependencies=job1
-key21=val21
-key22=val22
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
new file mode 100644
index 0000000..c4db05f
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
@@ -0,0 +1,3 @@
+gobblin.template.uri=resource:///template_catalog/templates/job2.template
+
+dependencies=job1

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.conf b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.conf
deleted file mode 100644
index fda7f39..0000000
--- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-dependencies=job1
-key31=val31
-key32=val32
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
new file mode 100644
index 0000000..59867b3
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
@@ -0,0 +1,2 @@
+gobblin.template.uri=resource:///template_catalog/templates/job3.template
+dependencies=job1

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.conf b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.conf
deleted file mode 100644
index c5ef881..0000000
--- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-dependencies="job2,job3"
-key41=val41
-key42=val42
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
new file mode 100644
index 0000000..8fdc611
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
@@ -0,0 +1,2 @@
+gobblin.template.uri=resource:///template_catalog/templates/job4.template
+dependencies=job2,job3


[3/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS).

Posted by hu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
new file mode 100644
index 0000000..c6f5c74
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+
+
+/**
+ * An implementation of an ADL (Azure Data Lake) {@link org.apache.gobblin.service.modules.flowgraph.DataNode}.
+ */
+public class AdlsDataNode extends FileSystemDataNode {
+  public static final String ADLS_SCHEME = "adl";
+
+  public AdlsDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+  }
+
+  /**
+    * @param fsUri FileSystem URI
+    * @return true if the scheme is "adl" and authority is not empty.
+    */
+  @Override
+  public boolean isUriValid(URI fsUri) {
+    String scheme = fsUri.getScheme();
+    //Check that the scheme is "adl"
+    if (!scheme.equals(ADLS_SCHEME)) {
+      return false;
+    }
+    //Ensure that the authority is not empty
+    if (Strings.isNullOrEmpty(fsUri.getAuthority())) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
new file mode 100644
index 0000000..72f1a66
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+import joptsimple.internal.Strings;
+import lombok.Getter;
+
+
+/**
+ * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
+ * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
+ */
+@Alpha
+public abstract class FileSystemDataNode extends BaseDataNode {
+  public static final String FS_URI_KEY = FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "fs.uri";
+
+  @Getter
+  private String fsUri;
+
+  /**
+   * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id.
+   */
+  public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+    try {
+      this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, "");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "fs.uri cannot be null or empty.");
+
+      //Validate the srcFsUri and destFsUri of the DataNode.
+      if (!isUriValid(new URI(this.fsUri))) {
+        throw new IOException("Invalid FS URI " + this.fsUri);
+      }
+    } catch (Exception e) {
+      throw new DataNodeCreationException(e);
+    }
+  }
+
+  public abstract boolean isUriValid(URI fsUri);
+  /**
+   * Two HDFS DataNodes are the same if they have the same id and the same fsUri.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    FileSystemDataNode that = (FileSystemDataNode) o;
+
+    return this.getId().equals(that.getId()) && this.fsUri.equals(that.getFsUri());
+  }
+
+  @Override
+  public int hashCode() {
+    return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
new file mode 100644
index 0000000..5402074
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri)
+ * are validated here.
+ */
+@Alpha
+public class HdfsDataNode extends FileSystemDataNode {
+  public static final String HDFS_SCHEME = "hdfs";
+
+  public HdfsDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+  }
+
+  /**
+   *
+   * @param fsUri FileSystem URI
+   * @return true if the scheme is "hdfs" and authority is not empty.
+   */
+  @Override
+  public boolean isUriValid(URI fsUri) {
+    String scheme = fsUri.getScheme();
+    //Check that the scheme is "hdfs"
+    if (!scheme.equals(HDFS_SCHEME)) {
+      return false;
+    }
+    //Ensure that the authority is not empty
+    if (Strings.isNullOrEmpty(fsUri.getAuthority())) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
new file mode 100644
index 0000000..757d4a0
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import com.typesafe.config.Config;
+
+/**
+ * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri)
+ * are validated here.
+ */
+@Alpha
+public class LocalFSDataNode extends FileSystemDataNode {
+  public static final String LOCAL_FS_SCHEME = "file";
+
+  public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+  }
+
+  /**
+   *
+   * @param fsUri FileSystem URI
+   * @return true if the scheme of fsUri equals "file"
+   */
+  @Override
+  public boolean isUriValid(URI fsUri) {
+    String scheme = fsUri.getScheme();
+    if (scheme.equals(LOCAL_FS_SCHEME)) {
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
new file mode 100644
index 0000000..c0c9297
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A data class that encapsulates information for executing a job. This includes a {@link JobSpec} and a {@link SpecExecutor}
+ * where the {@link JobSpec} will be executed.
+ */
+@Data
+@AllArgsConstructor
+public class JobExecutionPlan {
+  private JobSpec jobSpec;
+  private SpecExecutor specExecutor;
+
+  public static class Factory {
+
+    public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, SpecExecutor specExecutor, Long flowExecutionId)
+        throws URISyntaxException {
+        JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId);
+        return new JobExecutionPlan(jobSpec, specExecutor);
+    }
+
+    /**
+     * Given a resolved job config, this helper method converts the config to a {@link JobSpec}.
+     * @param jobConfig resolved job config.
+     * @param flowSpec input FlowSpec.
+     * @return a {@link JobSpec} corresponding to the resolved job config.
+     */
+    private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long flowExecutionId) throws URISyntaxException {
+      Config flowConfig = flowSpec.getConfig();
+
+      String flowName = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_NAME_KEY, "");
+      String flowGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_GROUP_KEY, "");
+      String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, "");
+
+      //Modify the job name to include the flow group:flow name.
+      jobName = Joiner.on(":").join(flowGroup, flowName, jobName);
+
+      JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, flowSpec)).withConfig(jobConfig)
+          .withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
+
+      //Get job template uri
+      URI jobTemplateUri = new URI(jobConfig.getString(ConfigurationKeys.JOB_TEMPLATE_PATH));
+      JobSpec jobSpec = jobSpecBuilder.withTemplate(jobTemplateUri).build();
+
+      //Add flowName to job spec
+      jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)));
+
+      //Add job name
+      jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef(jobName)));
+
+      //Add flow execution id
+      jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId)));
+
+      // Remove schedule
+      jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
+
+      // Add job.name and job.group
+      jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef(jobName)));
+      jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)));
+
+      //Enable job lock for each job to prevent concurrent executions.
+      jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_LOCK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true)));
+
+      // Reset properties in Spec from Config
+      jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));
+
+      return jobSpec;
+    }
+
+
+    /**
+     * A naive implementation of generating a jobSpec's URI within a multi-hop flow that follows the convention:
+     * <JOB_CATALOG_SCHEME>/{@link ConfigurationKeys#JOB_GROUP_KEY}/{@link ConfigurationKeys#JOB_NAME_KEY}.
+     */
+    public static URI jobSpecURIGenerator(String jobGroup, String jobName, FlowSpec flowSpec)
+        throws URISyntaxException {
+      return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME, flowSpec.getUri().getAuthority(),
+          StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(), "/"), "/") + jobGroup
+              + "/" + jobName, null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
new file mode 100644
index 0000000..f942f8d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+
+
+/**
+ * A Factory class used for constructing a {@link Dag} of {@link JobExecutionPlan}s from
+ * a {@link List} of {@link JobExecutionPlan}s.
+ */
+@Alpha
+@Slf4j
+public class JobExecutionPlanDagFactory {
+
+  public Dag<JobExecutionPlan> createDag(List<JobExecutionPlan> jobExecutionPlans) {
+    //Maintain a mapping between job name and the corresponding JobExecutionPlan.
+    Map<String, Dag.DagNode<JobExecutionPlan>> JobExecutionPlanMap = new HashMap<>();
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+    /**
+     * Create a {@link Dag.DagNode<JobExecutionPlan>} for every {@link JobSpec} in the flow. Add this node
+     * to a HashMap.
+     */
+    for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+      Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
+      dagNodeList.add(dagNode);
+      String jobName = getJobName(jobExecutionPlan);
+      if (jobName != null) {
+        JobExecutionPlanMap.put(jobName, dagNode);
+      }
+    }
+
+    /**
+     * Iterate over each {@link JobSpec} to get the dependencies of each {@link JobSpec}.
+     * For each {@link JobSpec}, get the corresponding {@link Dag.DagNode} and
+     * set the {@link Dag.DagNode}s corresponding to its dependencies as its parent nodes.
+     *
+     * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
+     */
+    for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+      String jobName = getJobName(jobExecutionPlan);
+      if (jobName == null) {
+        continue;
+      }
+      Dag.DagNode<JobExecutionPlan> node = JobExecutionPlanMap.get(jobName);
+      Collection<String> dependencies = getDependencies(jobExecutionPlan.getJobSpec().getConfig());
+      for (String dependency : dependencies) {
+        Dag.DagNode<JobExecutionPlan> parentNode = JobExecutionPlanMap.get(dependency);
+        node.addParentNode(parentNode);
+      }
+    }
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    return dag;
+  }
+
+  /**
+   * Get job dependencies of a given job from its config.
+   * @param config of a job.
+   * @return a list of dependencies of the job.
+   */
+  private static List<String> getDependencies(Config config) {
+    return config.hasPath(ConfigurationKeys.JOB_DEPENDENCIES) ? Arrays
+        .asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new ArrayList<>();
+  }
+
+  /**
+   * The job name is derived from the {@link org.apache.gobblin.runtime.api.JobTemplate} URI. It is the
+   * simple name of the path component of the URI.
+   * @param jobExecutionPlan
+   * @return the simple name from the URI path.
+   */
+  private static String getJobName(JobExecutionPlan jobExecutionPlan) {
+    Optional<URI> jobTemplateUri = jobExecutionPlan.getJobSpec().getTemplateURI();
+    if (jobTemplateUri.isPresent()) {
+      return Files.getNameWithoutExtension(new Path(jobTemplateUri.get()).getName());
+    } else {
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
index 30d8309..a8350fd 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
@@ -20,16 +20,16 @@ package org.apache.gobblin.service.modules.template;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
 
 import com.typesafe.config.Config;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
 
 /**
  * An interface primarily for representing a flow of {@link JobTemplate}s. It also has
@@ -45,18 +45,37 @@ public interface FlowTemplate extends Spec {
 
   /**
    *
-   * @return the {@link Dag<JobTemplate>} that backs the {@link FlowTemplate}.
+   * @return all configuration inside pre-written template.
    */
-  Dag<JobTemplate> getDag() throws IOException;
+  Config getRawTemplateConfig();
 
   /**
-   *
-   * @return all configuration inside pre-written template.
+   * @param userConfig a list of user customized attributes.
+   * @return list of input/output {@link DatasetDescriptor}s that fully resolve the {@link FlowTemplate} using the
+   * provided userConfig.
    */
-  Config getRawTemplateConfig();
+  List<Pair<DatasetDescriptor, DatasetDescriptor>> getResolvingDatasetDescriptors(Config userConfig)
+      throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException;
+
+  /**
+   * Checks if the {@link FlowTemplate} is resolvable using the provided {@link Config} object. A {@link FlowTemplate}
+   * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable
+   * @param userConfig User supplied Config
+   * @param inputDescriptor input {@link DatasetDescriptor}
+   * @param outputDescriptor output {@link DatasetDescriptor}
+   * @return true if the {@link FlowTemplate} is resolvable
+   */
+  boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
+      throws SpecNotFoundException, JobTemplate.TemplateException;
 
   /**
-   * @return list of input/output {@link DatasetDescriptor}s for the {@link FlowTemplate}.
+   * Resolves the {@link JobTemplate}s underlying this {@link FlowTemplate} and returns a {@link List} of resolved
+   * job {@link Config}s.
+   * @param userConfig User supplied Config
+   * @param inputDescriptor input {@link DatasetDescriptor}
+   * @param outputDescriptor output {@link DatasetDescriptor}
+   * @return a list of resolved job {@link Config}s.
    */
-  List<Pair<DatasetDescriptor, DatasetDescriptor>> getInputOutputDatasetDescriptors();
+  List<Config> getResolvedJobConfigs(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
+      throws SpecNotFoundException, JobTemplate.TemplateException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
index 553f067..ba9c091 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 import com.google.common.base.Charsets;
 import com.typesafe.config.Config;
@@ -43,15 +44,15 @@ public class HOCONInputStreamFlowTemplate extends StaticFlowTemplate {
   public static final String VERSION_KEY = "gobblin.flow.template.version";
   private static final String DEFAULT_VERSION = "1";
 
-  public HOCONInputStreamFlowTemplate(InputStream inputStream, URI uri, FlowCatalogWithTemplates catalog)
-      throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException {
+  public HOCONInputStreamFlowTemplate(InputStream inputStream, URI flowTemplateDirUri, FlowCatalogWithTemplates catalog)
+      throws SpecNotFoundException, IOException, JobTemplate.TemplateException, URISyntaxException {
     this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)).resolve(
-        ConfigResolveOptions.defaults().setAllowUnresolved(true)), uri, catalog);
+        ConfigResolveOptions.defaults().setAllowUnresolved(true)), flowTemplateDirUri, catalog);
   }
 
-  public HOCONInputStreamFlowTemplate(Config config, URI uri, FlowCatalogWithTemplates catalog)
-      throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException {
-    super(uri, ConfigUtils.getString(config, VERSION_KEY, DEFAULT_VERSION),
+  public HOCONInputStreamFlowTemplate(Config config, URI flowTemplateDirUri, FlowCatalogWithTemplates catalog)
+      throws SpecNotFoundException, IOException, JobTemplate.TemplateException, URISyntaxException {
+    super(flowTemplateDirUri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION,
         config.hasPath(ConfigurationKeys.FLOW_DESCRIPTION_KEY) ? config
             .getString(ConfigurationKeys.FLOW_DESCRIPTION_KEY) : "", config, catalog);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
deleted file mode 100644
index b89da4d..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.template;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-/**
- * A Factory class used for constructing a {@link Dag} of {@link org.apache.gobblin.runtime.api.JobTemplate}s from
- * a {@link URI} of a {@link FlowTemplate}.
- */
-@Alpha
-@Slf4j
-public class JobTemplateDagFactory {
-  public static final String JOB_TEMPLATE_FILE_SUFFIX = ".conf";
-
-  public static Dag<JobTemplate> createDagFromJobTemplates(List<JobTemplate> jobTemplates) {
-    Map<URI, Dag.DagNode<JobTemplate>> uriJobTemplateMap = new HashMap<>();
-    List<Dag.DagNode<JobTemplate>> dagNodeList = new ArrayList<>();
-    /**
-     * Create a {@link Dag.DagNode<JobTemplate>} for every {@link JobTemplate} in the flow. Add this node
-     * to a {@link Map<URI,JobTemplate>}.
-     */
-    for (JobTemplate template : jobTemplates) {
-      Dag.DagNode<JobTemplate> dagNode = new Dag.DagNode<>(template);
-      dagNodeList.add(dagNode);
-      uriJobTemplateMap.put(template.getUri(), dagNode);
-    }
-
-    /**
-     * Iterate over each {@link JobTemplate} to get the dependencies of each {@link JobTemplate}.
-     * For each {@link JobTemplate}, get the corresponding {@link Dag.DagNode} and
-     * set the {@link Dag.DagNode}s corresponding to the dependencies as its parent nodes.
-     *
-     * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
-     */
-    Path templateDirPath = new Path(jobTemplates.get(0).getUri()).getParent();
-    for (JobTemplate template : jobTemplates) {
-      URI templateUri = template.getUri();
-      Dag.DagNode<JobTemplate> node = uriJobTemplateMap.get(templateUri);
-      Collection<String> dependencies = template.getDependencies();
-      for (String dependency : dependencies) {
-        URI dependencyUri = new Path(templateDirPath, dependency).suffix(JOB_TEMPLATE_FILE_SUFFIX).toUri();
-        Dag.DagNode<JobTemplate> parentNode = uriJobTemplateMap.get(dependencyUri);
-        node.addParentNode(parentNode);
-      }
-    }
-    Dag<JobTemplate> dag = new Dag<>(dagNodeList);
-    return dag;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index 46f99d3..5f8dfda 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -19,38 +19,40 @@ package org.apache.gobblin.service.modules.template;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigResolveOptions;
+import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.apache.hadoop.fs.Path;
-
-import lombok.Getter;
 
 
 /**
  * A {@link FlowTemplate} using a static {@link Config} as the raw configuration for the template.
  */
 @Alpha
+@Slf4j
 public class StaticFlowTemplate implements FlowTemplate {
   private static final long serialVersionUID = 84641624233978L;
 
-  public static final String INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.input";
-  public static final String OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.output";
-  public static final String DATASET_DESCRIPTOR_CLASS_KEY = "class";
-
   @Getter
   private URI uri;
   @Getter
@@ -60,69 +62,74 @@ public class StaticFlowTemplate implements FlowTemplate {
   @Getter
   private transient FlowCatalogWithTemplates catalog;
   @Getter
-  private List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors;
-  @Getter
   private List<JobTemplate> jobTemplates;
 
-  private transient Dag<JobTemplate> dag;
-
   private transient Config rawConfig;
-  private boolean isTemplateMaterialized;
 
-  public StaticFlowTemplate(URI uri, String version, String description, Config config,
+  public StaticFlowTemplate(URI flowTemplateDirUri, String version, String description, Config config,
       FlowCatalogWithTemplates catalog)
-      throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException {
-    this.uri = uri;
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+    this.uri = flowTemplateDirUri;
     this.version = version;
     this.description = description;
-    this.inputOutputDatasetDescriptors = buildInputOutputDescriptors(config);
     this.rawConfig = config;
     this.catalog = catalog;
-    URI flowTemplateDir = new Path(this.uri).getParent().toUri();
-    this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDir);
+    this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDirUri);
   }
 
   //Constructor for testing purposes
-  public StaticFlowTemplate(URI uri, String version, String description, Config config,
-      FlowCatalogWithTemplates catalog, List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors, List<JobTemplate> jobTemplates)
-      throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException {
+  public StaticFlowTemplate(URI uri, String version, String description, Config config, FlowCatalogWithTemplates catalog, List<JobTemplate> jobTemplates) {
     this.uri = uri;
     this.version = version;
     this.description = description;
-    this.inputOutputDatasetDescriptors = inputOutputDatasetDescriptors;
     this.rawConfig = config;
     this.catalog = catalog;
     this.jobTemplates = jobTemplates;
   }
 
+
   /**
    * Generate the input/output dataset descriptors for the {@link FlowTemplate}.
+   * @param userConfig
+   * @return a List of Input/Output DatasetDescriptors that resolve this {@link FlowTemplate}.
    */
-  private List<Pair<DatasetDescriptor, DatasetDescriptor>> buildInputOutputDescriptors(Config config)
-      throws IOException, ReflectiveOperationException {
-    if (!config.hasPath(INPUT_DATASET_DESCRIPTOR_PREFIX) || !config.hasPath(OUTPUT_DATASET_DESCRIPTOR_PREFIX)) {
+  @Override
+  public List<Pair<DatasetDescriptor, DatasetDescriptor>> getResolvingDatasetDescriptors(Config userConfig)
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException {
+    Config config = this.getResolvedFlowConfig(userConfig).resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+    if (!config.hasPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX)
+        || !config.hasPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX)) {
       throw new IOException("Flow template must specify at least one input/output dataset descriptor");
     }
+
     int i = 0;
-    String inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
+    String inputPrefix = Joiner.on(".").join(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
     List<Pair<DatasetDescriptor, DatasetDescriptor>> result = Lists.newArrayList();
     while (config.hasPath(inputPrefix)) {
-      Config inputDescriptorConfig = config.getConfig(inputPrefix);
-      DatasetDescriptor inputDescriptor = getDatasetDescriptor(inputDescriptorConfig);
-      String outputPrefix = Joiner.on(".").join(OUTPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i++));
-      Config outputDescriptorConfig = config.getConfig(outputPrefix);
-      DatasetDescriptor outputDescriptor = getDatasetDescriptor(outputDescriptorConfig);
-      result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
-      inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
+      try {
+        Config inputDescriptorConfig = config.getConfig(inputPrefix);
+        DatasetDescriptor inputDescriptor = getDatasetDescriptor(inputDescriptorConfig);
+        String outputPrefix = Joiner.on(".")
+            .join(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
+        Config outputDescriptorConfig = config.getConfig(outputPrefix);
+        DatasetDescriptor outputDescriptor = getDatasetDescriptor(outputDescriptorConfig);
+
+        if (isResolvable(userConfig, inputDescriptor, outputDescriptor)) {
+          result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
+        }
+      } catch (ReflectiveOperationException e) {
+        //Cannot instantiate I/O dataset descriptor due to missing config; skip and try the next one.
+      }
+      inputPrefix = Joiner.on(".").join(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(++i));
     }
     return result;
   }
 
   private DatasetDescriptor getDatasetDescriptor(Config descriptorConfig)
       throws ReflectiveOperationException {
-    Class datasetDescriptorClass = Class.forName(descriptorConfig.getString(DATASET_DESCRIPTOR_CLASS_KEY));
-    return (DatasetDescriptor) GobblinConstructorUtils
-        .invokeLongestConstructor(datasetDescriptorClass, descriptorConfig);
+    Class datasetDescriptorClass = Class.forName(descriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+    return (DatasetDescriptor) GobblinConstructorUtils.invokeLongestConstructor(datasetDescriptorClass, descriptorConfig);
   }
 
   @Override
@@ -130,27 +137,53 @@ public class StaticFlowTemplate implements FlowTemplate {
     return this.rawConfig;
   }
 
-  private void ensureTemplateMaterialized()
-      throws IOException {
-    try {
-      if (!isTemplateMaterialized) {
-        this.dag = JobTemplateDagFactory.createDagFromJobTemplates(this.jobTemplates);
-      }
-      this.isTemplateMaterialized = true;
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
   @Override
   public List<JobTemplate> getJobTemplates() {
     return this.jobTemplates;
   }
 
+  private Config getResolvedFlowConfig(Config userConfig) {
+    return userConfig.withFallback(this.rawConfig);
+  }
+
+  /**
+   * Checks if the {@link FlowTemplate} is resolvable using the provided {@link Config} object. A {@link FlowTemplate}
+   * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable
+   * @param userConfig User supplied Config
+   * @return true if the {@link FlowTemplate} is resolvable
+   */
   @Override
-  public Dag<JobTemplate> getDag()
-      throws IOException {
-    ensureTemplateMaterialized();
-    return this.dag;
+  public boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
+      throws SpecNotFoundException, JobTemplate.TemplateException {
+    Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
+    Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+    userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
+
+    ConfigResolveOptions resolveOptions = ConfigResolveOptions.defaults().setAllowUnresolved(true);
+
+    for (JobTemplate template: this.jobTemplates) {
+      Config templateConfig = template.getResolvedConfig(userConfig).resolve(resolveOptions);
+      if (!template.getResolvedConfig(userConfig).resolve(resolveOptions).isResolved()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public List<Config> getResolvedJobConfigs(Config userConfig, DatasetDescriptor inputDescriptor,
+      DatasetDescriptor outputDescriptor)
+      throws SpecNotFoundException, JobTemplate.TemplateException {
+    Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
+    Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+    userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
+
+    List<Config> resolvedJobConfigs = new ArrayList<>();
+    for (JobTemplate jobTemplate: getJobTemplates()) {
+      Config resolvedJobConfig = jobTemplate.getResolvedConfig(userConfig).resolve().withValue(
+          ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef(jobTemplate.getUri().toString()));;
+      resolvedJobConfigs.add(resolvedJobConfig);
+    }
+    return resolvedJobConfigs;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
index 5dba91c..59c3c87 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
@@ -19,16 +19,23 @@ package org.apache.gobblin.service.modules.template_catalog;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 
+import com.google.common.base.Charsets;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigResolveOptions;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -41,17 +48,22 @@ import org.apache.gobblin.service.modules.template.FlowTemplate;
 import org.apache.gobblin.service.modules.template.HOCONInputStreamFlowTemplate;
 import org.apache.gobblin.util.PathUtils;
 
+
 /**
  * An implementation of a catalog for {@link FlowTemplate}s. Provides basic API for retrieving a {@link FlowTemplate}
  * from the catalog and for retrieving {@link JobTemplate}s that are part of a {@link FlowTemplate}.
  * The flow and job configuration files are assumed to have the following path structure:
- * <p> /path/to/template/catalog/flowName/flow.(conf|pull) </p>
- * <p> /path/to/template/catalog/flowName/jobs/job1.(conf|pull) </p>
- * <p> /path/to/template/catalog/flowName/jobs/job2.(conf|pull) </p>
+ * <p> /path/to/template/catalog/flowName/flow.conf </p>
+ * <p> /path/to/template/catalog/flowName/jobs/job1.(job|template) </p>
+ * <p> /path/to/template/catalog/flowName/jobs/job2.(job|template) </p>
  */
 @Alpha
 public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTemplates {
-  public static final String JOB_TEMPLATE_DIR_NAME="jobs";
+  public static final String JOBS_DIR_NAME = "jobs";
+  public static final String FLOW_CONF_FILE_NAME = "flow.conf";
+  public static final List<String> JOB_FILE_EXTENSIONS = Arrays.asList(".job", ".template");
+  public static final String JOB_TEMPLATE_KEY = "gobblin.template.uri";
+
   protected static final String FS_SCHEME = "FS";
 
   /**
@@ -59,64 +71,94 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla
    * @param sysConfig that must contain the fully qualified path of the flow template catalog
    * @throws IOException
    */
-  public FSFlowCatalog(Config sysConfig) throws IOException {
-    super(sysConfig.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, sysConfig.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)));
+  public FSFlowCatalog(Config sysConfig)
+      throws IOException {
+    super(sysConfig.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+        sysConfig.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)));
   }
 
   /**
    *
-   * @param flowUri URI of the flow configuration file
+   * @param flowTemplateDirURI URI of the flow template directory
    * @return a {@link FlowTemplate}
    * @throws SpecNotFoundException
    * @throws JobTemplate.TemplateException
    * @throws IOException
    */
-  public FlowTemplate getFlowTemplate(URI flowUri) throws SpecNotFoundException, JobTemplate.TemplateException, IOException {
+  public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
+      throws SpecNotFoundException, JobTemplate.TemplateException, IOException, URISyntaxException {
     if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) {
       throw new RuntimeException("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
     }
-    if (!flowUri.getScheme().equals(FS_SCHEME)) {
-      throw new RuntimeException("Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowUri.getScheme());
+    if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) {
+      throw new RuntimeException(
+          "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme());
     }
     String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
     // path of uri is location of template file relative to the job configuration root directory
-    Path templateFullPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowUri.getPath()));
+    Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirURI.getPath()));
+    Path templateFullPath = PathUtils.mergePaths(templateDirPath, new Path(FLOW_CONF_FILE_NAME));
     FileSystem fs = FileSystem.get(templateFullPath.toUri(), new Configuration());
 
     try (InputStream is = fs.open(templateFullPath)) {
-      return new HOCONInputStreamFlowTemplate(is, flowUri, this);
-    } catch (ReflectiveOperationException e) {
-      throw new RuntimeException(e);
+      return new HOCONInputStreamFlowTemplate(is, flowTemplateDirURI, this);
     }
   }
 
   /**
    *
-   * @param flowTemplateDirUri URI of the flow template directory
+   * @param flowTemplateDirURI URI of the flow template directory
    * @return a list of {@link JobTemplate}s for a given flow identified by its {@link URI}.
    * @throws IOException
    * @throws SpecNotFoundException
    * @throws JobTemplate.TemplateException
    */
-  public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirUri)
-      throws IOException, SpecNotFoundException, JobTemplate.TemplateException {
+  public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+
+    PathFilter extensionFilter = file -> {
+      for (String extension : JOB_FILE_EXTENSIONS) {
+        if (file.getName().endsWith(extension)) {
+          return true;
+        }
+      }
+      return false;
+    };
+
     if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) {
       throw new RuntimeException("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
     }
-    if (!flowTemplateDirUri.getScheme().equals(FS_SCHEME)) {
-      throw new RuntimeException("Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirUri.getScheme());
+    if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) {
+      throw new RuntimeException(
+          "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme());
     }
     List<JobTemplate> jobTemplates = new ArrayList<>();
 
     String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
-    Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirUri));
-    Path jobTemplatePath = new Path(templateDirPath, JOB_TEMPLATE_DIR_NAME);
+    Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirURI));
+    Path jobTemplatePath = new Path(templateDirPath, JOBS_DIR_NAME);
     FileSystem fs = FileSystem.get(jobTemplatePath.toUri(), new Configuration());
-    for (FileStatus fileStatus : fs.listStatus(jobTemplatePath)) {
-      try (InputStream is = fs.open(fileStatus.getPath())) {
-        jobTemplates.add(new HOCONInputStreamJobTemplate(is, fileStatus.getPath().toUri(), this));
+
+    for (FileStatus fileStatus : fs.listStatus(jobTemplatePath, extensionFilter)) {
+      Config templateConfig = loadHoconFileAtPath(fileStatus.getPath(), true);
+      if (templateConfig.hasPath(JOB_TEMPLATE_KEY)) {
+        URI templateUri = new URI(templateConfig.getString(JOB_TEMPLATE_KEY));
+        //Strip out the initial "/"
+        URI actualResourceUri = new URI(templateUri.getPath().substring(1));
+        Path fullTemplatePath =
+            new Path(FSFlowCatalog.class.getClassLoader().getResource(actualResourceUri.getPath()).toURI());
+        templateConfig = templateConfig.withFallback(loadHoconFileAtPath(fullTemplatePath, true));
       }
+      jobTemplates.add(new HOCONInputStreamJobTemplate(templateConfig, fileStatus.getPath().toUri(), this));
     }
     return jobTemplates;
   }
+
+  private Config loadHoconFileAtPath(Path filePath, boolean allowUnresolved)
+      throws IOException {
+    ConfigResolveOptions options = ConfigResolveOptions.defaults().setAllowUnresolved(allowUnresolved);
+    try (InputStream is = fs.open(filePath)) {
+      return ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8)).resolve(options);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
index 41c0c9e..8bafe97 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.template_catalog;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 
 import org.apache.gobblin.runtime.api.JobTemplate;
@@ -34,16 +35,18 @@ public interface FlowCatalogWithTemplates {
    * Get {@link FlowTemplate} with given {@link URI}.
    * @throws SpecNotFoundException if a {@link JobTemplate} with given {@link URI} cannot be found.
    */
-  FlowTemplate getFlowTemplate(URI uri) throws SpecNotFoundException, IOException, JobTemplate.TemplateException;
+  FlowTemplate getFlowTemplate(URI uri)
+      throws SpecNotFoundException, IOException, JobTemplate.TemplateException, URISyntaxException;
 
   /**
    *
-   * @param flowUri
+   * @param flowTemplateDirURI URI of the flow template directory.
    * @return a list of {@link JobTemplate}s for a given flow identified by its {@link URI}.
    * @throws IOException
    * @throws SpecNotFoundException
    * @throws JobTemplate.TemplateException
    */
-  public List<JobTemplate> getJobTemplatesForFlow(URI flowUri) throws IOException, SpecNotFoundException, JobTemplate.TemplateException;
+  public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
new file mode 100644
index 0000000..b5451bc
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.dircache.DirCache;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.RepositoryCache;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.util.FS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+
+
+public class GitFlowGraphMonitorTest {
+  private static final Logger logger = LoggerFactory.getLogger(GitFlowGraphMonitor.class);
+  private Repository remoteRepo;
+  private Git gitForPush;
+  private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir";
+  private final File remoteDir = new File(TEST_DIR + "/remote");
+  private final File cloneDir = new File(TEST_DIR + "/clone");
+  private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
+  private static final String NODE_1_FILE = "node1.properties";
+  private final File node1Dir = new File(flowGraphDir, "node1");
+  private final File node1File = new File(node1Dir, NODE_1_FILE);
+  private static final String NODE_2_FILE = "node2.properties";
+  private final File node2Dir = new File(flowGraphDir, "node2");
+  private final File node2File = new File(node2Dir, NODE_2_FILE);
+  private final File edge1Dir = new File(node1Dir, "node2");
+  private final File edge1File = new File(edge1Dir, "edge1.properties");
+
+  private RefSpec masterRefSpec = new RefSpec("master");
+  private FSFlowCatalog flowCatalog;
+  private Config config;
+  private BaseFlowGraph flowGraph;
+  private GitFlowGraphMonitor gitFlowGraphMonitor;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    cleanUpDir(TEST_DIR);
+
+    // Create a bare repository
+    RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED);
+    this.remoteRepo = fileKey.open(false);
+    this.remoteRepo.create(true);
+
+    this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+
+    // push an empty commit as a base for detecting changes
+    this.gitForPush.commit().setMessage("First commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.config = ConfigBuilder.create()
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
+            + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph")
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
+        .build();
+
+    // Create a FSFlowCatalog instance
+    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    Properties properties = new Properties();
+    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+    //Create a FlowGraph instance with defaults
+    this.flowGraph = new BaseFlowGraph();
+
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph);
+    this.gitFlowGraphMonitor.setActive(true);
+  }
+
+  private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, String paramValue)
+      throws IOException, GitAPIException {
+    // push a new node file
+    nodeDir.mkdirs();
+    nodeFile.createNewFile();
+    Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8);
+
+    // add, commit, push node
+    this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call();
+    this.gitForPush.commit().setMessage("Node commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if node1 has been added to the FlowGraph
+    DataNode dataNode = this.flowGraph.getNode(nodeId);
+    Assert.assertEquals(dataNode.getId(), nodeId);
+    Assert.assertTrue(dataNode.isActive());
+    Assert.assertEquals(dataNode.getRawConfig().getString("param1"), paramValue);
+  }
+
+  @Test
+  public void testAddNode()
+      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1");
+    testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2");
+  }
+
+  @Test (dependsOnMethods = "testAddNode")
+  public void testAddEdge()
+      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+    // push a new node file
+    this.edge1Dir.mkdirs();
+    this.edge1File.createNewFile();
+
+    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8);
+
+    // add, commit, push
+    this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
+    this.gitForPush.commit().setMessage("Edge commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if edge1 has been added to the FlowGraph
+    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 1);
+    FlowEdge flowEdge = edgeSet.iterator().next();
+    Assert.assertEquals(flowEdge.getSrc(), "node1");
+    Assert.assertEquals(flowEdge.getDest(), "node2");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
+  }
+
+  @Test (dependsOnMethods = "testAddNode")
+  public void testUpdateEdge()
+      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+    //Update edge1 file
+    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n"
+        + "key1=value1\n", edge1File, Charsets.UTF_8);
+
+    // add, commit, push
+    this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
+    this.gitForPush.commit().setMessage("Edge commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if new edge1 has been added to the FlowGraph
+    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 1);
+    FlowEdge flowEdge = edgeSet.iterator().next();
+    Assert.assertEquals(flowEdge.getSrc(), "node1");
+    Assert.assertEquals(flowEdge.getDest(), "node2");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
+    Assert.assertEquals(flowEdge.getConfig().getString("key1"), "value1");
+  }
+
+  @Test (dependsOnMethods = "testUpdateEdge")
+  public void testUpdateNode()
+      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+    //Update param1 value in node1 and check if updated node is added to the graph
+    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3");
+  }
+
+
+  @Test (dependsOnMethods = "testUpdateNode")
+  public void testRemoveEdge() throws GitAPIException, IOException {
+    // delete a config file
+    edge1File.delete();
+
+    //Node1 has 1 edge before delete
+    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 1);
+
+    // delete, commit, push
+    DirCache ac = this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
+        this.edge1Dir.getName(), this.edge1File.getName())).call();
+    RevCommit cc = this.gitForPush.commit().setMessage("Edge remove commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if edge1 has been deleted from the graph
+    edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertTrue(edgeSet.size() == 0);
+  }
+
+  @Test (dependsOnMethods = "testRemoveEdge")
+  public void testRemoveNode() throws GitAPIException, IOException {
+    //delete node file
+    node1File.delete();
+
+    //node1 is present in the graph before delete
+    DataNode node1 = this.flowGraph.getNode("node1");
+    Assert.assertNotNull(node1);
+
+    // delete, commit, push
+    DirCache ac = this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call();
+    RevCommit cc = this.gitForPush.commit().setMessage("Node remove commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if node1 has been deleted from the graph
+    node1 = this.flowGraph.getNode("node1");
+    Assert.assertNull(node1);
+  }
+
+
+  private void cleanUpDir(String dir) {
+    File specStoreDir = new File(dir);
+
+    // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
+    for (int i = 0; i < 5; i++) {
+      try {
+        if (specStoreDir.exists()) {
+          FileUtils.deleteDirectory(specStoreDir);
+        }
+        // if delete succeeded then break out of loop
+        break;
+      } catch (IOException e) {
+        logger.warn("Cleanup delete directory failed for directory: " + dir, e);
+      }
+    }
+  }
+
+  private String formNodeFilePath(String groupDir, String fileName) {
+    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
+  }
+
+  private String formEdgeFilePath(String parentDir, String groupDir, String fileName) {
+    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    cleanUpDir(TEST_DIR);
+  }
+}
\ No newline at end of file


[4/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS).

Posted by hu...@apache.org.
[GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS).

Closes #2393 from sv2000/multiHopCompiler


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/22a951f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/22a951f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/22a951f0

Branch: refs/heads/master
Commit: 22a951f0a4ac0c963e99cd2a15989c62a08c81cf
Parents: 33d4fea
Author: suvasude <su...@linkedin.biz>
Authored: Mon Jul 30 09:57:31 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Jul 30 09:57:31 2018 -0700

----------------------------------------------------------------------
 .../template/HOCONInputStreamJobTemplate.java   |   2 +-
 .../modules/core/GitMonitoringService.java      |   2 +-
 .../dataset/BaseHdfsDatasetDescriptor.java      |  98 -----
 .../modules/dataset/DatasetDescriptor.java      |  38 +-
 .../modules/dataset/EncryptionConfig.java       |  90 ++++
 .../modules/dataset/FSDatasetDescriptor.java    | 138 ++++++
 .../service/modules/dataset/FormatConfig.java   | 102 +++++
 .../modules/dataset/HdfsDatasetDescriptor.java  |  40 --
 .../service/modules/flow/FlowEdgeContext.java   |  46 ++
 .../service/modules/flow/FlowGraphPath.java     |  90 ++++
 .../modules/flow/FlowGraphPathFinder.java       | 320 ++++++++++++++
 .../modules/flow/MultiHopFlowCompiler.java      | 157 +++++++
 .../service/modules/flowgraph/BaseDataNode.java |   8 +-
 .../service/modules/flowgraph/BaseFlowEdge.java |  20 +-
 .../modules/flowgraph/BaseFlowGraph.java        |   8 +-
 .../gobblin/service/modules/flowgraph/Dag.java  |  10 +-
 .../service/modules/flowgraph/DataNode.java     |   3 +-
 .../flowgraph/DatasetDescriptorConfigKeys.java  |  18 +
 .../modules/flowgraph/FileSystemDataNode.java   |  83 ----
 .../service/modules/flowgraph/FlowEdge.java     |  12 +-
 .../service/modules/flowgraph/FlowGraph.java    |   7 +
 .../flowgraph/FlowGraphConfigurationKeys.java   |   8 +-
 .../service/modules/flowgraph/HdfsDataNode.java |  59 ---
 .../modules/flowgraph/LocalFSDataNode.java      |  51 ---
 .../flowgraph/datanodes/fs/AdlsDataNode.java    |  52 +++
 .../datanodes/fs/FileSystemDataNode.java        |  87 ++++
 .../flowgraph/datanodes/fs/HdfsDataNode.java    |  58 +++
 .../flowgraph/datanodes/fs/LocalFSDataNode.java |  51 +++
 .../service/modules/spec/JobExecutionPlan.java  | 117 ++++++
 .../spec/JobExecutionPlanDagFactory.java        | 114 +++++
 .../service/modules/template/FlowTemplate.java  |  39 +-
 .../template/HOCONInputStreamFlowTemplate.java  |  13 +-
 .../modules/template/JobTemplateDagFactory.java |  79 ----
 .../modules/template/StaticFlowTemplate.java    | 143 ++++---
 .../modules/template_catalog/FSFlowCatalog.java |  90 ++--
 .../FlowCatalogWithTemplates.java               |   9 +-
 .../modules/core/GitFlowGraphMonitorTest.java   | 314 ++++++++++++++
 .../modules/flow/FlowGraphPathFinderTest.java   | 417 +++++++++++++++++++
 .../flowgraph/BaseFlowEdgeFactoryTest.java      |  74 ++++
 .../modules/flowgraph/BaseFlowGraphTest.java    |  13 +-
 .../spec/JobExecutionPlanDagFactoryTest.java    | 116 ++++++
 .../template/JobTemplateDagFactoryTest.java     |  92 ----
 .../template_catalog/FSFlowCatalogTest.java     |  53 ++-
 .../src/test/resources/flow/flow.conf           |  24 ++
 .../datanodes/AdlsDataNode-1.properties         |   3 +
 .../datanodes/HdfsDataNode-1.properties         |   3 +
 .../datanodes/HdfsDataNode-2.properties         |   3 +
 .../datanodes/HdfsDataNode-3.properties         |   3 +
 .../datanodes/HdfsDataNode-4.properties         |   3 +
 .../datanodes/LocalFsDataNode-1.properties      |   3 +
 .../hdfs-1-to-hdfs-1-encrypt.properties         |   9 +
 .../flowedges/hdfs-1-to-hdfs-3.properties       |  10 +
 .../hdfs-2-to-hdfs-2-encrypt.properties         |   9 +
 .../flowedges/hdfs-2-to-hdfs-4.properties       |   9 +
 .../flowedges/hdfs-3-to-adls-1.properties       |  13 +
 .../flowedges/hdfs-4-to-adls-1.properties       |  13 +
 .../flowedges/local-to-hdfs-1.properties        |   9 +
 .../flowedges/local-to-hdfs-2.properties        |   9 +
 .../modules/core/GitFlowGraphMonitorTest.java   | 314 --------------
 .../flowgraph/BaseFlowEdgeFactoryTest.java      |  73 ----
 .../template_catalog/flowEdgeTemplate/flow.conf |  20 +
 .../flowEdgeTemplate/jobs/job1.job              |   1 +
 .../flowEdgeTemplate/jobs/job2.job              |   3 +
 .../flowEdgeTemplate/jobs/job3.job              |   2 +
 .../flowEdgeTemplate/jobs/job4.job              |   2 +
 .../hdfsConvertToJsonAndEncrypt/flow.conf       |  18 +
 .../jobs/hdfs-encrypt-avro-to-json.job          |   1 +
 .../flowEdgeTemplates/hdfsToAdl/flow.conf       |  18 +
 .../hdfsToAdl/jobs/distcp-hdfs-to-adl.job       |   1 +
 .../flowEdgeTemplates/hdfsToHdfs/flow.conf      |  15 +
 .../hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job     |   1 +
 .../flowEdgeTemplates/localToHdfs/flow.conf     |   9 +
 .../localToHdfs/jobs/distcp-local-to-hdfs.job   |   1 +
 .../distcp-push-hdfs-to-adl.template            |  65 +++
 .../multihop/jobTemplates/distcp.template       |  57 +++
 .../hdfs-convert-to-json-and-encrypt.template   |  42 ++
 .../template_catalog/templates/job1.template    |   2 +
 .../template_catalog/templates/job2.template    |   2 +
 .../template_catalog/templates/job3.template    |   2 +
 .../template_catalog/templates/job4.template    |   2 +
 .../template_catalog/test-template/flow.conf    |  30 +-
 .../test-template/jobs/job1.conf                |   2 -
 .../test-template/jobs/job1.job                 |   1 +
 .../test-template/jobs/job2.conf                |   3 -
 .../test-template/jobs/job2.job                 |   3 +
 .../test-template/jobs/job3.conf                |   3 -
 .../test-template/jobs/job3.job                 |   2 +
 .../test-template/jobs/job4.conf                |   3 -
 .../test-template/jobs/job4.job                 |   2 +
 89 files changed, 3082 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
index a1337fd..5e132fe 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
@@ -51,7 +51,7 @@ public class HOCONInputStreamJobTemplate extends StaticJobTemplate {
     this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)), uri, catalog);
   }
 
-  private HOCONInputStreamJobTemplate(Config config, URI uri, JobCatalogWithTemplates catalog)
+  public HOCONInputStreamJobTemplate(Config config, URI uri, JobCatalogWithTemplates catalog)
       throws SpecNotFoundException, TemplateException {
     super(uri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION,
         config.hasPath(ConfigurationKeys.JOB_DESCRIPTION_KEY) ? config.getString(ConfigurationKeys.JOB_DESCRIPTION_KEY) : "",

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
index 2361edc..c4d3656 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -107,7 +107,7 @@ public abstract class GitMonitoringService extends AbstractIdleService {
         ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor")));
   }
 
-  synchronized void setActive(boolean isActive) {
+  public synchronized void setActive(boolean isActive) {
     if (this.isActive == isActive) {
       // No-op if already in correct state
       return;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
deleted file mode 100644
index 7d7e2b4..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.dataset;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
-import org.apache.gobblin.util.ConfigUtils;
-
-import lombok.Getter;
-
-
-/**
- * An implementation of {@link HdfsDatasetDescriptor}.
- */
-@Alpha
-public class BaseHdfsDatasetDescriptor implements HdfsDatasetDescriptor {
-  @Getter
-  private final String path;
-  @Getter
-  private final String format;
-  @Getter
-  private final String description;
-  @Getter
-  private final String platform;
-
-  public BaseHdfsDatasetDescriptor(Config config) {
-    Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PATH_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.PATH_KEY));
-    Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.FORMAT_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.FORMAT_KEY));
-
-    this.path = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, null);
-    this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, null);
-    this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
-    this.platform = "hdfs";
-  }
-
-  /**
-   * A {@link HdfsDatasetDescriptor} is compatible with another {@link DatasetDescriptor} iff they have identical
-   * platform, type, path, and format.
-   * TODO: Currently isCompatibleWith() only checks if HDFS paths described by the two {@link DatasetDescriptor}s
-   * being compared are identical. Need to enhance this for the case of where paths can contain glob patterns.
-   * e.g. paths described by the pattern /data/input/* are a subset of paths described by /data/* and hence, the
-   * two descriptors should be compatible.
-   * @return true if this {@link HdfsDatasetDescriptor} is compatibe with another {@link DatasetDescriptor}.
-   */
-  @Override
-  public boolean isCompatibleWith(DatasetDescriptor o) {
-    return this.equals(o);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    HdfsDatasetDescriptor other = (HdfsDatasetDescriptor) o;
-    if(this.getPlatform() == null || other.getPlatform() == null) {
-      return false;
-    }
-    if(!this.getPlatform().equalsIgnoreCase(other.getPlatform()) || !(o instanceof HdfsDatasetDescriptor)) {
-      return false;
-    }
-
-    return this.getPath().equals(other.getPath()) && this.getFormat().equalsIgnoreCase(other.getFormat());
-  }
-
-  @Override
-  public String toString() {
-     return "(" + Joiner.on(",").join(this.getPlatform(),this.getPath(),this.getFormat()) + ")";
-  }
-
-  @Override
-  public int hashCode() {
-    return this.toString().hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
index 4a322e6..e8474e3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -17,28 +17,54 @@
 
 package org.apache.gobblin.service.modules.dataset;
 
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.annotation.Alpha;
 
 
 /**
- * The interface for dataset descriptors.
+ * The interface for dataset descriptors. Each dataset is described in terms of the following attributes:
+ *  <ul>
+ *    <p> platform (e.g. HDFS, ADLS, JDBC). </p>
+ *    <p> path, which describes the fully qualified name of the dataset. </p>
+ *    <p> a format descriptor, which encapsulates its representation (e.g. avro, csv), codec (e.g. gzip, deflate), and
+ *    encryption config (e.g. aes_rotating, gpg). </p>
+ *  </ul>
  */
 @Alpha
 public interface DatasetDescriptor {
   /**
-   * @return the dataset platform i.e. the storage backing the dataset (e.g. HDFS, JDBC, Espresso etc.)
+   * @return the dataset platform i.e. the storage system backing the dataset (e.g. HDFS, ADLS, JDBC etc.)
    */
   public String getPlatform();
 
   /**
+   * Returns the fully qualified name of a dataset. The fully qualified name is the absolute directory path of a dataset
+   * when the dataset is backed by a FileSystem. In the case of a database table, it is dbName.tableName.
+   * @return dataset path.
+   */
+  public String getPath();
+
+  /**
+   *
+   * @return storage format of the dataset.
+   */
+  public FormatConfig getFormatConfig();
+
+  /**
    * @return a human-readable description of the dataset.
    */
   public String getDescription();
 
   /**
-   * @return true if this {@link DatasetDescriptor} is compatible with the other {@link DatasetDescriptor} i.e. the
-   * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other {@link DatasetDescriptor}.
-   * This check is non-commutative.
+   * @return true if this {@link DatasetDescriptor} contains the other {@link DatasetDescriptor} i.e. the
+   * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other
+   * {@link DatasetDescriptor}. This operation is non-commutative.
+   */
+  public boolean contains(DatasetDescriptor other);
+
+  /**
+   * @return the raw config.
    */
-  public boolean isCompatibleWith(DatasetDescriptor other);
+  public Config getRawConfig();
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
new file mode 100644
index 0000000..21c7c17
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class EncryptionConfig {
+  @Getter
+  private final String encryptionAlgorithm;
+  @Getter
+  private final String keystoreType;
+  @Getter
+  private final String keystoreEncoding;
+
+  public EncryptionConfig(Config encryptionConfig) {
+    this.encryptionAlgorithm = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY,
+        DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.keystoreType = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
+        DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
+        DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+  }
+
+  public boolean contains(EncryptionConfig other) {
+    if (other == null) {
+      return false;
+    }
+
+    String otherEncryptionAlgorithm = other.getEncryptionAlgorithm();
+    String otherKeystoreType = other.getKeystoreType();
+    String otherKeystoreEncoding = other.getKeystoreEncoding();
+
+    return (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getEncryptionAlgorithm())
+        || this.encryptionAlgorithm.equalsIgnoreCase(otherEncryptionAlgorithm))
+        && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreType())
+        || this.keystoreType.equalsIgnoreCase(otherKeystoreType))
+        && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreEncoding())
+        || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof EncryptionConfig)) {
+      return false;
+    }
+    EncryptionConfig other = (EncryptionConfig) o;
+    return this.getEncryptionAlgorithm().equalsIgnoreCase(other.getEncryptionAlgorithm()) && this.keystoreEncoding.equalsIgnoreCase(other.getKeystoreEncoding())
+        && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType());
+  }
+
+  @Override
+  public String toString() {
+    return "(" + Joiner.on(",").join(this.encryptionAlgorithm, this.keystoreType, this.keystoreEncoding) + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 31 * result + encryptionAlgorithm.toLowerCase().hashCode();
+    result = 31 * result + keystoreType.toLowerCase().hashCode();
+    result = 31 * result + keystoreEncoding.toLowerCase().hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
new file mode 100644
index 0000000..a5cb717
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+
+
+/**
+ * An implementation of {@link DatasetDescriptor} with FS-based storage.
+ */
+@Alpha
+public class FSDatasetDescriptor implements DatasetDescriptor {
+  @Getter
+  private final String platform;
+  @Getter
+  private final String path;
+  @Getter
+  private final FormatConfig formatConfig;
+  @Getter
+  private final String description;
+  @Getter
+  private final Config rawConfig;
+
+  public FSDatasetDescriptor(Config config) {
+    Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY), "Dataset descriptor config must specify platform");
+    this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY);
+    this.path = PathUtils.getPathWithoutSchemeAndAuthority(new Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
+        DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
+    this.formatConfig = new FormatConfig(config);
+    this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
+    this.rawConfig = config;
+  }
+
+  /**
+   * A helper to determine if the path description of this {@link DatasetDescriptor} is a superset of paths
+   * accepted by the other {@link DatasetDescriptor}. If the path description of the other {@link DatasetDescriptor}
+   * is a glob pattern, we return false.
+   *
+   * @param otherPath a glob pattern that describes a set of paths.
+   * @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}.
+   */
+  public boolean isPathContaining(String otherPath) {
+    if (otherPath == null) {
+      return false;
+    }
+    if (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getPath())) {
+      return true;
+    }
+    if (PathUtils.isGlob(new Path(otherPath))) {
+      return false;
+    }
+    GlobPattern globPattern = new GlobPattern(this.getPath());
+    return globPattern.matches(otherPath);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean contains(DatasetDescriptor o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof FSDatasetDescriptor)) {
+      return false;
+    }
+    FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+
+    if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
+      return false;
+    }
+
+    return getFormatConfig().contains(other.getFormatConfig()) && isPathContaining(other.getPath());
+  }
+
+  /**
+   *
+   * @param o
+   * @return true iff  "this" dataset descriptor is compatible with the "other" and the "other" dataset descriptor is
+   * compatible with this dataset descriptor.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof FSDatasetDescriptor)) {
+      return false;
+    }
+    FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+    if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
+      return false;
+    }
+    return this.getPath().equals(other.getPath()) && this.getFormatConfig().equals(other.getFormatConfig());
+  }
+
+  @Override
+  public String toString() {
+     return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(), this.getFormatConfig().toString()) + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 31 * result + platform.toLowerCase().hashCode();
+    result = 31 * result + path.hashCode();
+    result = 31 * result + getFormatConfig().hashCode();
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
new file mode 100644
index 0000000..a36182c
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A location-independent descriptor of a dataset, which describes a dataset in terms of its physical attributes.
+ * The physical attributes include:
+ *  <ul>
+ *    <p> Data format (e.g. Avro, CSV, JSON). </p>
+ *    <p> Data encoding type (e.g. Gzip, Bzip2, Base64, Deflate). </p>
+ *    <p> Encryption properties (e.g. aes_rotating, gpg). </p>
+ *  </ul>
+ */
+@Alpha
+public class FormatConfig {
+  @Getter
+  private final String format;
+  @Getter
+  private final String codecType;
+  @Getter
+  private final EncryptionConfig encryptionConfig;
+
+  public FormatConfig(Config config) {
+    this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.codecType = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.CODEC_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config, DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory
+        .empty()));
+  }
+
+  public boolean contains(FormatConfig other) {
+    return containsFormat(other.getFormat()) && containsCodec(other.getCodecType())
+        && containsEncryptionConfig(other.getEncryptionConfig());
+  }
+
+  private boolean containsFormat(String otherFormat) {
+    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
+        || (this.getFormat().equalsIgnoreCase(otherFormat));
+  }
+
+  private boolean containsCodec(String otherCodecType) {
+    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
+        || (this.getCodecType().equalsIgnoreCase(otherCodecType));
+  }
+
+  private boolean containsEncryptionConfig(EncryptionConfig otherEncryptionConfig) {
+    return this.getEncryptionConfig().contains(otherEncryptionConfig);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof FormatConfig)) {
+      return false;
+    }
+    FormatConfig other = (FormatConfig) o;
+    return this.getFormat().equalsIgnoreCase(other.getFormat()) && this.getCodecType().equalsIgnoreCase(other.getCodecType())
+        && this.getEncryptionConfig().equals(other.getEncryptionConfig());
+  }
+
+  @Override
+  public String toString() {
+    return "(" + Joiner.on(",").join(this.getFormat(), this.getCodecType(), this.getEncryptionConfig().toString()) + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 31 * result + codecType.toLowerCase().hashCode();
+    result = 31 * result + format.toLowerCase().hashCode();
+    result = 31 * result + encryptionConfig.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
deleted file mode 100644
index 6f1970c..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.dataset;
-
-import org.apache.gobblin.annotation.Alpha;
-
-
-/**
- * A descriptor interface for HDFS datasets
- */
-@Alpha
-public interface HdfsDatasetDescriptor extends DatasetDescriptor {
-  /**
-   *
-   * @return dataset path.
-   */
-  public String getPath();
-
-  /**
-   *
-   * @return storage format of the dataset.
-   */
-  public String getFormat();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
new file mode 100644
index 0000000..daff8ce
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flow;
+
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+
+
+/**
+ * A helper class used to maintain additional context associated with each {@link FlowEdge} during path
+ * computation while the edge is explored for its eligibility. The additional context includes the input
+ * {@link DatasetDescriptor} of this edge which is compatible with the previous {@link FlowEdge}'s output
+ * {@link DatasetDescriptor} (where "previous" means the immediately preceding {@link FlowEdge} visited before
+ * the current {@link FlowEdge}), and the corresponding output dataset descriptor of the current {@link FlowEdge}.
+ */
+@AllArgsConstructor
+@EqualsAndHashCode(exclude = {"mergedConfig", "specExecutor"})
+@Getter
+public class FlowEdgeContext {
+  private FlowEdge edge;
+  private DatasetDescriptor inputDatasetDescriptor;
+  private DatasetDescriptor outputDatasetDescriptor;
+  private Config mergedConfig;
+  private SpecExecutor specExecutor;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
new file mode 100644
index 0000000..c642708
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+
+
+/**
+ * A class that returns a {@link Dag} of {@link JobExecutionPlan}s from a sequence of edges
+ * represented as a {@link List} of {@link FlowEdgeContext}s.
+ */
+public class FlowGraphPath {
+  private List<FlowEdgeContext> path;
+  private FlowSpec flowSpec;
+  private Long flowExecutionId;
+
+  public FlowGraphPath(List<FlowEdgeContext> path, FlowSpec flowSpec, Long flowExecutionId) {
+    this.path = path;
+    this.flowSpec = flowSpec;
+    this.flowExecutionId = flowExecutionId;
+  }
+
+  public Dag<JobExecutionPlan> asDag()
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+    Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>());
+    Iterator<FlowEdgeContext> pathIterator = path.iterator();
+    while (pathIterator.hasNext()) {
+      Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next());
+      flowDag = flowDag.concatenate(flowEdgeDag);
+    }
+    return flowDag;
+  }
+
+  /**
+   * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < JobExecutionPlan >} that moves data
+   * from the source of the {@link FlowEdge} to the destination of the {@link FlowEdge}.
+   * @param flowEdgeContext an instance of {@link FlowEdgeContext}.
+   * @return a {@link Dag} of {@link JobExecutionPlan}s associated with the {@link FlowEdge}.
+   */
+  private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext)
+      throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+    FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate();
+    DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor();
+    DatasetDescriptor outputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+    Config mergedConfig = flowEdgeContext.getMergedConfig();
+    SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor();
+
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+    //Get resolved job configs from the flow template
+    List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor);
+    //Iterate over each resolved job config and convert the config to a JobSpec.
+    for (Config resolvedJobConfig : resolvedJobConfigs) {
+      jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId));
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
new file mode 100644
index 0000000..2b4746c
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Alpha
+@Slf4j
+public class FlowGraphPathFinder {
+  private static final String SOURCE_PREFIX = "source";
+  private static final String DESTINATION_PREFIX = "destination";
+
+  private FlowGraph flowGraph;
+  private FlowSpec flowSpec;
+  private Config flowConfig;
+
+  private DataNode srcNode;
+  private DataNode destNode;
+
+  private DatasetDescriptor srcDatasetDescriptor;
+  private DatasetDescriptor destDatasetDescriptor;
+
+  //Maintain path of FlowEdges as parent-child map
+  private Map<FlowEdgeContext, FlowEdgeContext> pathMap;
+
+  //Flow Execution Id
+  private Long flowExecutionId;
+
+  /**
+   * Constructor.
+   * @param flowGraph
+   */
+  public FlowGraphPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) {
+    this.flowGraph = flowGraph;
+    this.flowSpec = flowSpec;
+    this.flowConfig = flowSpec.getConfig();
+
+    //Get src/dest DataNodes from the flow config
+    String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
+    String destNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+    this.srcNode = this.flowGraph.getNode(srcNodeId);
+    Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
+    this.destNode = this.flowGraph.getNode(destNodeId);
+    Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId);
+
+    //Get src/dest dataset descriptors from the flow config
+    Config srcDatasetDescriptorConfig =
+        flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
+    Config destDatasetDescriptorConfig =
+        flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+
+    try {
+      Class srcdatasetDescriptorClass =
+          Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+      this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+          .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig);
+      Class destDatasetDescriptorClass =
+          Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+      this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+          .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s
+   * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are
+   * added first to the queue. This ensures that dataset transformations are always performed closest to the source.
+   * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode.
+   */
+  public FlowGraphPath findPath() throws PathFinderException {
+    try {
+      //Initialization of auxiliary data structures used for path computation
+      this.pathMap = new HashMap<>();
+
+      // Generate flow execution id for this compilation
+      this.flowExecutionId = System.currentTimeMillis();
+
+      //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
+      // flow graph.
+      // TODO: we can easily improve the performance by using a ReentrantReadWriteLock associated with the FlowGraph. This will
+      // allow multiple concurrent readers to not be blocked on each other, as long as there are no writers.
+      synchronized (this.flowGraph) {
+        //Base condition 1: Source Node or Dest Node is inactive; return null
+        if (!srcNode.isActive() || !destNode.isActive()) {
+          log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", this.srcNode.getId(),
+              this.destNode.getId());
+          return null;
+        }
+
+        //Base condition 2: Check if we are already at the target. If so, return an empty path.
+        if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
+          return new FlowGraphPath(new ArrayList<>(), flowSpec, flowExecutionId);
+        }
+
+        LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
+        edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
+        for (FlowEdgeContext flowEdgeContext : edgeQueue) {
+          this.pathMap.put(flowEdgeContext, flowEdgeContext);
+        }
+
+        //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges
+        // to the edge E. For each adjacent edge E', do the following:
+        //    1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and
+        //    2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
+        //       edge E'. If yes, add the edge E' to the edge queue.
+        // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
+        while (!edgeQueue.isEmpty()) {
+          FlowEdgeContext flowEdgeContext = edgeQueue.pop();
+
+          DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
+          DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+
+          //Are we done?
+          if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
+            return constructPath(flowEdgeContext);
+          }
+
+          //Expand the currentNode to its adjacent edges and add them to the queue.
+          List<FlowEdgeContext> nextEdges =
+              getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
+          for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
+            //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
+            // queue.
+            if (!this.pathMap.containsKey(childFlowEdgeContext)) {
+              edgeQueue.add(childFlowEdgeContext);
+              this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
+            }
+          }
+        }
+      }
+      //No path found. Return null.
+      return null;
+    } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) {
+      throw new PathFinderException(
+          "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + this.destNode.getId(), e);
+    }
+  }
+
+  private boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor,
+      DatasetDescriptor destDatasetDescriptor) {
+    if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an
+   * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor.
+   * @param dataNode
+   * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge.
+   * @param destDatasetDescriptor Target {@link DatasetDescriptor}.
+   * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion.
+   */
+  private List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
+      DatasetDescriptor destDatasetDescriptor) {
+    List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
+    for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
+      try {
+        DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
+        //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive.
+        if (!edgeDestination.isActive() || !flowEdge.isActive()) {
+          continue;
+        }
+
+        boolean foundExecutor = false;
+        //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
+        for (SpecExecutor specExecutor: flowEdge.getExecutors()) {
+          Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
+          List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
+              flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
+          for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
+            DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
+            DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
+            if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+              FlowEdgeContext flowEdgeContext;
+              if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+                //If datasets described by the currentDatasetDescriptor is a subset of the datasets described
+                // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g.
+                // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward.
+                flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, specExecutor);
+              } else {
+                //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge)
+                flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, specExecutor);
+              }
+              if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) {
+                //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
+                // with those of destination dataset descriptor.
+                // In other words, we prioritize edges that perform data transformations as close to the source as possible.
+                prioritizedEdgeList.add(0, flowEdgeContext);
+              } else {
+                prioritizedEdgeList.add(flowEdgeContext);
+              }
+              foundExecutor = true;
+            }
+          }
+          // Found a SpecExecutor. Proceed to the next FlowEdge.
+          // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
+          if (foundExecutor) {
+            break;
+          }
+        }
+      } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException
+          | JobTemplate.TemplateException e) {
+        //Skip the edge; and continue
+        log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
+      }
+    }
+    return prioritizedEdgeList;
+  }
+
+  /**
+   * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below):
+   * <ul>
+   *   <p> the user provided flow config </p>
+   *   <p> edge specific properties/overrides </p>
+   *   <p> spec executor config/overrides </p>
+   *   <p> source node config </p>
+   *   <p> destination node config </p>
+   * </ul>
+   * Each {@link JobTemplate}'s config will eventually be resolved against this merged config.
+   * @param flowEdge An instance of {@link FlowEdge}.
+   * @param specExecutor A {@link SpecExecutor}.
+   * @return the merged config derived as described above.
+   */
+  private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor)
+      throws ExecutionException, InterruptedException {
+    Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
+    Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
+    Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig())
+        .withFallback(srcNodeConfig).withFallback(destNodeConfig);
+    return mergedConfig;
+  }
+
+  /**
+   *
+   * @param flowEdgeContext of the last {@link FlowEdge} in the path.
+   * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}.
+   * @throws IOException
+   * @throws SpecNotFoundException
+   * @throws JobTemplate.TemplateException
+   * @throws URISyntaxException
+   */
+  private FlowGraphPath constructPath(FlowEdgeContext flowEdgeContext)
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+    //Backtrace from the last edge using the path map and push each edge into a LIFO data structure.
+    List<FlowEdgeContext> path = new LinkedList<>();
+    path.add(flowEdgeContext);
+    FlowEdgeContext currentFlowEdgeContext = flowEdgeContext;
+    while (true) {
+      path.add(0, this.pathMap.get(currentFlowEdgeContext));
+      currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext);
+      //Are we at the first edge in the path?
+      if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) {
+        break;
+      }
+    }
+    FlowGraphPath flowGraphPath = new FlowGraphPath(path, flowSpec, flowExecutionId);
+    return flowGraphPath;
+  }
+
+  public static class PathFinderException extends Exception {
+    public PathFinderException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public PathFinderException(String message) {
+      super(message);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
new file mode 100644
index 0000000..8b14b10
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+
+
+/***
+ * Take in a logical {@link Spec} ie flow and compile corresponding materialized job {@link Spec}
+ * and its mapping to {@link SpecExecutor}.
+ */
+@Alpha
+@Slf4j
+public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
+  @Getter
+  private FlowGraph flowGraph;
+  private GitFlowGraphMonitor gitFlowGraphMonitor;
+  @Getter
+  private boolean active;
+
+  public MultiHopFlowCompiler(Config config) {
+    this(config, true);
+  }
+
+  public MultiHopFlowCompiler(Config config, boolean instrumentationEnabled) {
+    this(config, Optional.<Logger>absent(), instrumentationEnabled);
+  }
+
+  public MultiHopFlowCompiler(Config config, Optional<Logger> log) {
+    this(config, log, true);
+  }
+
+  public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
+    super(config, log, instrumentationEnabled);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    FSFlowCatalog flowCatalog;
+    try {
+      flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
+    }
+    this.flowGraph = new BaseFlowGraph();
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph);
+  }
+
+  public void setActive(boolean active) {
+    this.active = active;
+    this.gitFlowGraphMonitor.setActive(active);
+  }
+
+  /**
+   * TODO: We need to change signature of compileFlow to return a Dag instead of a HashMap to capture
+   * job dependencies.
+   * @param spec
+   * @return
+   */
+  @Override
+  public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+    Preconditions.checkNotNull(spec);
+    Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowToJobSpecCompiler only accepts FlowSpecs");
+
+    long startTime = System.nanoTime();
+    Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap();
+
+    FlowSpec flowSpec = (FlowSpec) spec;
+    String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
+    String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+    log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
+
+    FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec);
+    try {
+      //Compute the path from source to destination.
+      FlowGraphPath flowGraphPath = pathFinder.findPath();
+
+      //Convert the path into a Dag of JobExecutionPlans.
+      Dag<JobExecutionPlan> jobExecutionPlanDag;
+      if (flowGraphPath != null) {
+        jobExecutionPlanDag = flowGraphPath.asDag();
+      } else {
+        Instrumented.markMeter(this.flowCompilationFailedMeter);
+        log.info(String.format("No path found from source: %s and destination: %s", source, destination));
+        return null;
+      }
+
+      //TODO: Just a dummy return value for now. compileFlow() signature needs to be modified to return a Dag instead
+      // of a Map. For now just add all specs into the map.
+      for (Dag.DagNode<JobExecutionPlan> node: jobExecutionPlanDag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = node.getValue();
+        specExecutorMap.put(jobExecutionPlan.getJobSpec(), jobExecutionPlan.getSpecExecutor());
+      }
+    } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | IOException
+        | URISyntaxException e) {
+      Instrumented.markMeter(this.flowCompilationFailedMeter);
+      log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e);
+      return null;
+    }
+    Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
+    Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+    return specExecutorMap;
+  }
+
+  @Override
+  protected void populateEdgeTemplateMap() {
+    log.warn("No population of templates based on edge happen in this implementation");
+    return;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
index 731bc22..4fb9711 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
@@ -19,9 +19,9 @@ package org.apache.gobblin.service.modules.flowgraph;
 
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.ConfigUtils;
 
 import joptsimple.internal.Strings;
@@ -38,7 +38,7 @@ public class BaseDataNode implements DataNode {
   @Getter
   private String id;
   @Getter
-  private Config props;
+  private Config rawConfig;
   @Getter
   private boolean active = true;
 
@@ -50,8 +50,8 @@ public class BaseDataNode implements DataNode {
       if (nodeProps.hasPath(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY)) {
         this.active = nodeProps.getBoolean(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY);
       }
-      this.props = nodeProps;
-    } catch(Exception e) {
+      this.rawConfig = nodeProps;
+    } catch (Exception e) {
       throw new DataNodeCreationException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index fc82cc1..56f6c1b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -44,7 +44,10 @@ import org.apache.gobblin.util.ConfigUtils;
 @Alpha
 public class BaseFlowEdge implements FlowEdge {
   @Getter
-  protected List<String> endPoints;
+  protected String src;
+
+  @Getter
+  protected String dest;
 
   @Getter
   protected FlowTemplate flowTemplate;
@@ -53,7 +56,7 @@ public class BaseFlowEdge implements FlowEdge {
   private List<SpecExecutor> executors;
 
   @Getter
-  private Config props;
+  private Config config;
 
   @Getter
   private String id;
@@ -63,11 +66,12 @@ public class BaseFlowEdge implements FlowEdge {
 
   //Constructor
   public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
-    this.endPoints = endPoints;
+    this.src = endPoints.get(0);
+    this.dest = endPoints.get(1);
     this.flowTemplate = flowTemplate;
     this.executors = executors;
     this.active = active;
-    this.props = properties;
+    this.config = properties;
     this.id = edgeId;
   }
 
@@ -91,7 +95,7 @@ public class BaseFlowEdge implements FlowEdge {
 
     FlowEdge that = (FlowEdge) o;
 
-    if (!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) {
+    if (!(this.getSrc().equals(that.getSrc())) && ((this.getDest()).equals(that.getDest()))) {
       return false;
     }
 
@@ -140,14 +144,14 @@ public class BaseFlowEdge implements FlowEdge {
           specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i));
         }
 
-        String flowTemplateUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "");
+        String flowTemplateDirUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "");
 
         //Perform basic validation
         Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points");
         Preconditions
             .checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor");
         Preconditions
-            .checkArgument(!Strings.isNullOrEmpty(flowTemplateUri), "FlowTemplate URI must be not null or empty");
+            .checkArgument(!Strings.isNullOrEmpty(flowTemplateDirUri), "FlowTemplate URI must be not null or empty");
         boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
 
         //Build SpecExecutor from config
@@ -158,7 +162,7 @@ public class BaseFlowEdge implements FlowEdge {
           SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig);
           specExecutors.add(executor);
         }
-        FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri));
+        FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateDirUri));
         return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive);
       } catch (RuntimeException e) {
         throw e;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index 783f7ea..edf40cc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -75,8 +75,8 @@ public class BaseFlowGraph implements FlowGraph {
    */
   @Override
   public synchronized boolean addFlowEdge(FlowEdge edge) {
-    String srcNode = edge.getEndPoints().get(0);
-    String dstNode = edge.getEndPoints().get(1);
+    String srcNode = edge.getSrc();
+    String dstNode = edge.getDest();
     if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) {
       return false;
     }
@@ -153,10 +153,10 @@ public class BaseFlowGraph implements FlowGraph {
    * if the {@link FlowEdge} is not in the graph, return false.
    */
   public synchronized boolean deleteFlowEdge(FlowEdge edge) {
-    if(!dataNodeMap.containsKey(edge.getEndPoints().get(0))) {
+    if(!dataNodeMap.containsKey(edge.getSrc())) {
       return false;
     }
-    DataNode node = dataNodeMap.get(edge.getEndPoints().get(0));
+    DataNode node = dataNodeMap.get(edge.getSrc());
     if(!nodesToEdges.get(node).contains(edge)) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 8ae0027..58bbb81 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -87,6 +87,10 @@ public class Dag<T> {
     return node.parentNodes;
   }
 
+  public boolean isEmpty() {
+    return this.nodes.isEmpty();
+  }
+
   /**
    * Concatenate two dags together. Join the "other" dag to "this" dag and return "this" dag.
    * The concatenate method ensures that all the jobs of "this" dag (which may have multiple end nodes)
@@ -97,9 +101,12 @@ public class Dag<T> {
    * @return the concatenated dag
    */
   public Dag<T> concatenate(Dag<T> other) throws IOException {
-    if (other == null) {
+    if (other == null || other.isEmpty()) {
       return this;
     }
+    if (this.isEmpty()) {
+      return other;
+    }
     for (DagNode node : this.endNodes) {
       this.parentChildMap.put(node, Lists.newArrayList());
       for (DagNode otherNode : other.startNodes) {
@@ -108,6 +115,7 @@ public class Dag<T> {
       }
       this.endNodes = other.endNodes;
     }
+    this.nodes.addAll(other.nodes);
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
index 4931685..b7a5274 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.flowgraph;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
 
 
 /**
@@ -37,7 +36,7 @@ public interface DataNode {
    * @return the attributes of a {@link DataNode}. It also includes properties for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}
    * e.g. "source.fs.uri" for an HDFS node, "jdbc.publisher.url" for JDBC node.
    */
-  Config getProps();
+  Config getRawConfig();
 
   /**
    * @return true if the {@link DataNode} is active

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index e98337d..23e20c8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -22,7 +22,25 @@ package org.apache.gobblin.service.modules.flowgraph;
  */
 public class DatasetDescriptorConfigKeys {
   //Gobblin Service Dataset Descriptor related keys
+  public static final String FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.input.dataset.descriptor";
+  public static final String FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.output.dataset.descriptor";
+
+  public static final String FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.edge.input.dataset.descriptor";
+  public static final String FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.edge.output.dataset.descriptor";
+
+  public static final String CLASS_KEY = "class";
+  public static final String PLATFORM_KEY = "platform";
   public static final String PATH_KEY = "path";
   public static final String FORMAT_KEY = "format";
+  public static final String CODEC_KEY = "codec";
   public static final String DESCRIPTION_KEY = "description";
+
+  //Dataset encryption related keys
+  public static final String ENCYPTION_PREFIX = "encrypt";
+  public static final String ENCRYPTION_ALGORITHM_KEY = "algorithm";
+  public static final String ENCRYPTION_KEYSTORE_TYPE_KEY = "keystore_type";
+  public static final String ENCRYPTION_KEYSTORE_ENCODING_KEY = "keystore_encoding";
+
+  public static final String DATASET_DESCRIPTOR_CONFIG_ANY = "any";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
deleted file mode 100644
index 5899645..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.io.IOException;
-import java.net.URI;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.util.ConfigUtils;
-
-import joptsimple.internal.Strings;
-import lombok.Getter;
-
-
-/**
- * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
- * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
- */
-@Alpha
-public abstract class FileSystemDataNode extends BaseDataNode {
-  public static final String FS_URI_KEY = "fs.uri";
-  @Getter
-  private String fsUri;
-
-  /**
-   * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id.
-   */
-  public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException {
-    super(nodeProps);
-    try {
-      this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, "");
-      Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "FS URI cannot be null or empty for an HDFSDataNode");
-      URI uri = new URI(this.fsUri);
-      if(!isUriValid(uri)) {
-        throw new IOException("Invalid FS URI " + this.fsUri);
-      }
-    } catch(Exception e) {
-      throw new DataNodeCreationException(e);
-    }
-  }
-
-  public abstract boolean isUriValid(URI fsUri);
-  /**
-   * Two HDFS DataNodes are the same if they have the same id and the same fsUri.
-   */
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    FileSystemDataNode that = (FileSystemDataNode) o;
-
-    return this.getId().equals(that.getId()) && fsUri.equals(that.getFsUri());
-  }
-
-  @Override
-  public int hashCode() {
-    return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
index fb60d67..497bd5b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
@@ -41,9 +41,15 @@ import org.apache.gobblin.service.modules.template.FlowTemplate;
 public interface FlowEdge {
   /**
    *
-   * @return the {@link DataNode} ids that are the end points of the edge.
+   * @return the source {@link DataNode} id of the edge.
    */
-  List<String> getEndPoints();
+  String getSrc();
+
+  /**
+   *
+   * @return the destination {@link DataNode} id of the edge.
+   */
+  String getDest();
 
   /**
    *
@@ -62,7 +68,7 @@ public interface FlowEdge {
    * is instantiated. It also includes properties needed for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}.
    * @return the properties of this edge as a {@link Config} object.
    */
-  Config getProps();
+  Config getConfig();
 
   /**
    * A string uniquely identifying the edge.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
index 23f5793..b4aa7bf 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
@@ -32,6 +32,13 @@ import org.apache.gobblin.annotation.Alpha;
 public interface FlowGraph {
 
   /**
+   * Get a {@link DataNode} from the node identifier
+   * @param nodeId {@link DataNode} identifier.
+   * @return the {@link DataNode} object if the node is present in the {@link FlowGraph}.
+   */
+  public DataNode getNode(String nodeId);
+
+  /**
    * Add a {@link DataNode} to the {@link FlowGraph}
    * @param node {@link DataNode} to be added
    * @return true if {@link DataNode} is added to the {@link FlowGraph} successfully.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
index cd4876a..8a49ec0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
@@ -22,7 +22,7 @@ public class FlowGraphConfigurationKeys {
   public static final String FLOW_EDGE_PREFIX = "flow.edge.";
 
   /**
-   *   {@link DataNode} configuration keys.
+   *   {@link DataNode} related configuration keys.
    */
   public static final String DATA_NODE_CLASS = DATA_NODE_PREFIX + "class";
   public static final String DEFAULT_DATA_NODE_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseDataNode";
@@ -30,7 +30,7 @@ public class FlowGraphConfigurationKeys {
   public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive";
 
   /**
-   * {@link FlowEdge} configuration keys.
+   * {@link FlowEdge} related configuration keys.
    */
   public static final String FLOW_EDGE_FACTORY_CLASS = FLOW_EDGE_PREFIX + "factory.class";
   public static final String DEFAULT_FLOW_EDGE_FACTORY_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseFlowEdge$Factory";
@@ -39,7 +39,7 @@ public class FlowGraphConfigurationKeys {
   public static final String FLOW_EDGE_ID_KEY = FLOW_EDGE_PREFIX + "id";
   public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name";
   public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive";
-  public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri";
+  public static final String FLOW_EDGE_TEMPLATE_DIR_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateDirUri";
   public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors";
-  public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass";
+  public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecInstance.class";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
deleted file mode 100644
index 7bcc18d..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-
-import joptsimple.internal.Strings;
-
-
-/**
- * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri)
- * are validated here.
- */
-@Alpha
-public class HdfsDataNode extends FileSystemDataNode {
-  public static final String HDFS_SCHEME = "hdfs";
-
-  public HdfsDataNode(Config nodeProps) throws DataNodeCreationException {
-    super(nodeProps);
-  }
-
-  /**
-   *
-   * @param fsUri FileSystem URI
-   * @return true if the scheme is "hdfs" and authority is not empty.
-   */
-  @Override
-  public boolean isUriValid(URI fsUri) {
-    String scheme = fsUri.getScheme();
-    //Check that the scheme is "hdfs"
-    if(!scheme.equals(HDFS_SCHEME)) {
-      return false;
-    }
-    //Ensure that the authority is not empty
-    if(Strings.isNullOrEmpty(fsUri.getAuthority())) {
-      return false;
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
deleted file mode 100644
index 6dc1aa3..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-
-import org.apache.gobblin.annotation.Alpha;
-
-import com.typesafe.config.Config;
-
-/**
- * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri)
- * are validated here.
- */
-@Alpha
-public class LocalFSDataNode extends FileSystemDataNode {
-  public static final String LOCAL_FS_SCHEME = "file";
-
-  public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException {
-    super(nodeProps);
-  }
-
-  /**
-   *
-   * @param fsUri FileSystem URI
-   * @return true if the scheme of fsUri equals "file"
-   */
-  @Override
-  public boolean isUriValid(URI fsUri) {
-    String scheme = fsUri.getScheme();
-    if(scheme.equals(LOCAL_FS_SCHEME)) {
-      return true;
-    }
-    return false;
-  }
-}


[2/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS).

Posted by hu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
new file mode 100644
index 0000000..5d0500c
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigSyntax;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Slf4j
+public class FlowGraphPathFinderTest {
+  private FlowGraph flowGraph;
+  private FlowGraphPathFinder pathFinder;
+
+  @BeforeClass
+  public void setUp()
+      throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException {
+    //Create a FlowGraph
+    this.flowGraph = new BaseFlowGraph();
+
+    //Add DataNodes to the graph from the node properties files
+    URI dataNodesUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI();
+    FileSystem fs = FileSystem.get(dataNodesUri, new Configuration());
+    Path dataNodesPath = new Path(dataNodesUri);
+    ConfigParseOptions options = ConfigParseOptions.defaults()
+        .setSyntax(ConfigSyntax.PROPERTIES)
+        .setAllowMissing(false);
+
+    for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) {
+      try (InputStream is = fs.open(fileStatus.getPath())) {
+        Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+        Class dataNodeClass = Class.forName(ConfigUtils
+            .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+        DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
+        this.flowGraph.addDataNode(dataNode);
+      }
+    }
+
+    //Create a FSFlowCatalog instance
+    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    // Create a FSFlowCatalog instance
+    Properties properties = new Properties();
+    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+
+    //Add FlowEdges from the edge properties files
+    URI flowEdgesURI = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
+    fs = FileSystem.get(flowEdgesURI, new Configuration());
+    Path flowEdgesPath = new Path(flowEdgesURI);
+    for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) {
+      log.warn(fileStatus.getPath().toString());
+      try (InputStream is = fs.open(fileStatus.getPath())) {
+        Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+        Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+            FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
+        FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
+        FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog);
+        this.flowGraph.addFlowEdge(edge);
+      }
+    }
+
+    //Create a flow spec
+    Properties flowProperties = new Properties();
+    flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
+    flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup");
+    flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName");
+    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "LocalFS-1");
+    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "ADLS-1");
+    Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties);
+
+    //Get the input/output dataset config from a file
+    URI flowConfigUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flow/flow.conf").toURI();
+    Path flowConfigPath = new Path(flowConfigUri);
+    FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration());
+    try (InputStream is = fs1.open(flowConfigPath)) {
+      Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset()));
+      flowConfig = flowConfig.withFallback(datasetConfig).resolve();
+    }
+
+    FlowSpec.Builder flowSpecBuilder = null;
+    flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri())
+        .withConfig(flowConfig)
+        .withDescription("dummy description")
+        .withVersion(FlowSpec.Builder.DEFAULT_VERSION);
+
+    FlowSpec spec = flowSpecBuilder.build();
+    this.pathFinder = new FlowGraphPathFinder(this.flowGraph, spec);
+  }
+
+  @Test
+  public void testFindPath()
+      throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+             SpecNotFoundException, IOException {
+    Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
+    Assert.assertEquals(jobDag.getNodes().size(), 4);
+    Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+    Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+    //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1"
+    Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+    JobExecutionPlan jobSpecWithExecutor = startNode.getValue();
+    JobSpec jobSpec = jobSpecWithExecutor.getJobSpec();
+
+    //Ensure the resolved job config for the first hop has the correct substitutions.
+    Config jobConfig = jobSpec.getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    String from = jobConfig.getString("from");
+    String to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+    String sourceFsUri = jobConfig.getString("fs.uri");
+    Assert.assertEquals(sourceFsUri, "file:///");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+    Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+    String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+    Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+    Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    Assert.assertEquals(jobConfig.getString("type"), "java");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+    //Ensure the spec executor has the correct configurations
+    SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+    //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt"
+    Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+    jobSpecWithExecutor = secondHopNode.getValue();
+    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3"
+    Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+    jobSpecWithExecutor = thirdHopNode.getValue();
+    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Get the 4th hop - "Distcp from HDFS3 to ADLS-1"
+    Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+    jobSpecWithExecutor = fourthHopNode.getValue();
+    jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+    Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+    Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobSpecWithExecutor.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Ensure the fourth hop is the last
+    Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+  }
+
+  @Test (dependsOnMethods = "testFindPath")
+  public void testFindPathAfterFirstEdgeDeletion()
+      throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+             SpecNotFoundException, IOException {
+    //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
+    this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
+
+    Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
+    Assert.assertEquals(jobDag.getNodes().size(), 4);
+    Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+    Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+    //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2"
+    Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+    JobExecutionPlan jobExecutionPlan = startNode.getValue();
+    JobSpec jobSpec = jobExecutionPlan.getJobSpec();
+
+    //Ensure the resolved job config for the first hop has the correct substitutions.
+    Config jobConfig = jobSpec.getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    String from = jobConfig.getString("from");
+    String to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+    String sourceFsUri = jobConfig.getString("fs.uri");
+    Assert.assertEquals(sourceFsUri, "file:///");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+    Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+    String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+    Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+    Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    Assert.assertEquals(jobConfig.getString("type"), "java");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+    //Ensure the spec executor has the correct configurations
+    SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+    //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt"
+    Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+    jobExecutionPlan = secondHopNode.getValue();
+    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+    Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+    specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4"
+    Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+    jobExecutionPlan = thirdHopNode.getValue();
+    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Get the 4th hop - "Distcp from HDFS4 to ADLS-1"
+    Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+    Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+    jobExecutionPlan = fourthHopNode.getValue();
+    jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+    Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+    from = jobConfig.getString("from");
+    to = jobConfig.getString("to");
+    Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+    Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+    Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+    Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+    Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+    Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+    Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+    //Ensure the spec executor has the correct configurations
+    specExecutor = jobExecutionPlan.getSpecExecutor();
+    Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443");
+    Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+    //Ensure the fourth hop is the last
+    Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+  }
+
+  @Test (dependsOnMethods = "testFindPathAfterFirstEdgeDeletion")
+  public void testFindPathAfterSecondEdgeDeletion()
+      throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+             SpecNotFoundException, IOException {
+    //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
+    this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
+
+    //Ensure no path to destination.
+    Assert.assertNull(pathFinder.findPath());
+  }
+
+  @AfterClass
+  public void tearDown() {
+  }
+
+  public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor {
+    // Executor Instance
+    protected final Config config;
+
+    private SpecProducer<Spec> azkabanSpecProducer;
+
+    public TestAzkabanSpecExecutor(Config config) {
+      super(config);
+      this.config = config;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      //Do nothing
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      //Do nothing
+    }
+
+    @Override
+    public Future<String> getDescription() {
+      return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
+    }
+
+    @Override
+    public Future<? extends SpecProducer> getProducer() {
+      return new CompletedFuture<>(this.azkabanSpecProducer, null);
+    }
+
+    @Override
+    public Future<Config> getConfig() {
+      return new CompletedFuture<>(config, null);
+    }
+
+    @Override
+    public Future<String> getHealth() {
+      return new CompletedFuture<>("Healthy", null);
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
new file mode 100644
index 0000000..2694f5c
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.net.URI;
+import java.util.Properties;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class BaseFlowEdgeFactoryTest {
+  @Test
+  public void testCreateFlowEdge() throws Exception {
+    Properties properties = new Properties();
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"node1");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "node2");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "node1:node2:edge1");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "FS:///flowEdgeTemplate");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2");
+    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2");
+
+    FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory();
+
+    Properties props = new Properties();
+    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    props.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(props);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+    Config edgeProps = ConfigUtils.propertiesToConfig(properties);
+    FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog);
+    Assert.assertEquals(flowEdge.getSrc(), "node1");
+    Assert.assertEquals(flowEdge.getDest(), "node2");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),"s1:d1");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),"/tmp2");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),"s2:d2");
+    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(),"InMemorySpecExecutor");
+    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(),"InMemorySpecExecutor");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
index 04f2270..be7b597 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -33,8 +32,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
 import org.apache.gobblin.service.modules.template.StaticFlowTemplate;
 import org.apache.gobblin.util.ConfigUtils;
@@ -57,9 +54,7 @@ public class BaseFlowGraphTest {
 
   BaseFlowGraph graph;
   @BeforeClass
-  public void setUp()
-      throws URISyntaxException, ReflectiveOperationException, JobTemplate.TemplateException, SpecNotFoundException,
-             IOException, DataNode.DataNodeCreationException {
+  public void setUp() throws URISyntaxException, DataNode.DataNodeCreationException {
     Properties properties = new Properties();
     properties.put("key1", "val1");
     Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
@@ -81,9 +76,9 @@ public class BaseFlowGraphTest {
     //Create a clone of node3
     node3c = new BaseDataNode(node3Config);
 
-    FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null, null);
-    FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null, null);
-    FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null, null);
+    FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null);
+    FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null);
+    FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null);
 
     //Create edge instances
     edgeId1 = "node1:node2:edge1";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
new file mode 100644
index 0000000..2542f5e
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class JobExecutionPlanDagFactoryTest {
+  private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
+  private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME;
+  private SpecExecutor specExecutor;
+  private List<JobTemplate> jobTemplates;
+
+  @BeforeClass
+  public void setUp() throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException {
+    // Create a FSFlowCatalog instance
+    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    Properties properties = new Properties();
+    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+    FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
+    this.jobTemplates = flowTemplate.getJobTemplates();
+
+    //Create a spec executor instance
+    properties = new Properties();
+    properties.put("specStore.fs.dir", "/tmp/testSpecStoreDir");
+    properties.put("specExecInstance.capabilities", "source:destination");
+    Config specExecutorConfig = ConfigUtils.propertiesToConfig(properties);
+    this.specExecutor = new InMemorySpecExecutor(specExecutorConfig);
+  }
+
+  @Test
+  public void testCreateDag() throws Exception {
+    //Create a list of JobExecutionPlans
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    for (JobTemplate jobTemplate: this.jobTemplates) {
+      String jobSpecUri = Files.getNameWithoutExtension(new Path(jobTemplate.getUri()).getName());
+      jobExecutionPlans.add(new JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(jobTemplate.getRawTemplateConfig()).
+          withVersion("1").withTemplate(jobTemplate.getUri()).build(), specExecutor));
+    }
+
+    //Create a DAG from job execution plans.
+    Dag<JobExecutionPlan> dag = new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+
+    //Test DAG properties
+    Assert.assertEquals(dag.getStartNodes().size(), 1);
+    Assert.assertEquals(dag.getEndNodes().size(), 1);
+    Assert.assertEquals(dag.getNodes().size(), 4);
+    String startNodeName = new Path(dag.getStartNodes().get(0).getValue().getJobSpec().getUri()).getName();
+    Assert.assertEquals(startNodeName, "job1");
+    String templateUri = new Path(dag.getStartNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName();
+    Assert.assertEquals(templateUri, "job1.job");
+    String endNodeName = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getUri()).getName();
+    Assert.assertEquals(endNodeName, "job4");
+    templateUri = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName();
+    Assert.assertEquals(templateUri, "job4.job");
+
+    Dag.DagNode<JobExecutionPlan> startNode = dag.getStartNodes().get(0);
+    List<Dag.DagNode<JobExecutionPlan>> nextNodes = dag.getChildren(startNode);
+    Set<String> nodeSet = new HashSet<>();
+    for (Dag.DagNode<JobExecutionPlan> node: nextNodes) {
+      nodeSet.add(new Path(node.getValue().getJobSpec().getUri()).getName());
+      Dag.DagNode<JobExecutionPlan> nextNode = dag.getChildren(node).get(0);
+      Assert.assertEquals(new Path(nextNode.getValue().getJobSpec().getUri()).getName(), "job4");
+    }
+    Assert.assertTrue(nodeSet.contains("job2"));
+    Assert.assertTrue(nodeSet.contains("job3"));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
deleted file mode 100644
index 58d879e..0000000
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.template;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-
-public class JobTemplateDagFactoryTest {
-  private static final String TEST_TEMPLATE_NAME = "test-template";
-  private static final String TEST_FLOW_CONF_FILE_NAME="flow.conf";
-  private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME + "/" + TEST_FLOW_CONF_FILE_NAME;
-  FSFlowCatalog catalog;
-
-  @BeforeClass
-  public void setUp()
-      throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException {
-    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-
-    // Create a FSFlowCatalog instance
-    Properties properties = new Properties();
-    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
-    Config config = ConfigFactory.parseProperties(properties);
-    Config templateCatalogCfg = config
-        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    this.catalog = new FSFlowCatalog(templateCatalogCfg);
-  }
-
-  @Test
-  public void testCreateDagFromJobTemplates() throws Exception {
-    FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
-    List<JobTemplate> jobTemplates = flowTemplate.getJobTemplates();
-
-    //Create a DAG from job templates.
-    Dag<JobTemplate> jobTemplateDag = JobTemplateDagFactory.createDagFromJobTemplates(jobTemplates);
-
-    //Test DAG properties
-    Assert.assertEquals(jobTemplateDag.getStartNodes().size(), 1);
-    Assert.assertEquals(jobTemplateDag.getEndNodes().size(), 1);
-    Assert.assertEquals(jobTemplateDag.getNodes().size(), 4);
-    String startNodeName = new Path(jobTemplateDag.getStartNodes().get(0).getValue().getUri()).getName();
-    Assert.assertEquals(startNodeName, "job1.conf");
-    String endNodeName = new Path(jobTemplateDag.getEndNodes().get(0).getValue().getUri()).getName();
-    Assert.assertEquals(endNodeName, "job4.conf");
-
-    Dag.DagNode<JobTemplate> startNode = jobTemplateDag.getStartNodes().get(0);
-    List<Dag.DagNode<JobTemplate>> nextNodes = jobTemplateDag.getChildren(startNode);
-    Set<String> nodeSet = new HashSet<>();
-    for(Dag.DagNode<JobTemplate> node: nextNodes) {
-      nodeSet.add(new Path(node.getValue().getUri()).getName());
-      Dag.DagNode<JobTemplate> nextNode = jobTemplateDag.getChildren(node).get(0);
-      Assert.assertEquals(new Path(nextNode.getValue().getUri()).getName(), "job4.conf");
-    }
-    Assert.assertTrue(nodeSet.contains("job2.conf"));
-    Assert.assertTrue(nodeSet.contains("job3.conf"));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
index b20606f..3c8ebd3 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
@@ -23,19 +23,20 @@ import java.util.Properties;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
-import org.apache.gobblin.service.modules.dataset.HdfsDatasetDescriptor;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
 import org.testng.collections.Lists;
 
@@ -43,9 +44,8 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class FSFlowCatalogTest {
-  private static final String TEST_TEMPLATE_NAME = "test-template";
-  private static final String TEST_FLOW_CONF_FILE_NAME="flow.conf";
-  private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME + "/" + TEST_FLOW_CONF_FILE_NAME;
+  private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
+  private static final String TEST_TEMPLATE_DIR_URI = "FS:///" + TEST_TEMPLATE_NAME;
 
   @Test
   public void testGetFlowTemplate() throws Exception {
@@ -58,50 +58,45 @@ public class FSFlowCatalogTest {
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
             config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
     FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
-    FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
+    FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_DIR_URI));
 
     //Basic sanity check for the FlowTemplate
-    Dag<JobTemplate> jobTemplateDag = flowTemplate.getDag();
-    List<Dag.DagNode<JobTemplate>> dagNodes = jobTemplateDag.getNodes();
-    Assert.assertTrue(dagNodes.size() == 4);
-    Assert.assertEquals(jobTemplateDag.getStartNodes().size(), 1);
-    Assert.assertEquals(jobTemplateDag.getEndNodes().size(), 1);
-    Dag.DagNode<JobTemplate> dagNode = jobTemplateDag.getStartNodes().get(0);
-    URI startNodeUri = this.getClass().getClassLoader().getResource("template_catalog/test-template/jobs/job1.conf").toURI();
-    URI endNodeUri = this.getClass().getClassLoader().getResource("template_catalog/test-template/jobs/job4.conf").toURI();
-    Assert.assertEquals(jobTemplateDag.getStartNodes().get(0).getValue().getUri(), startNodeUri);
-    Assert.assertEquals(jobTemplateDag.getEndNodes().get(0).getValue().getUri(), endNodeUri);
 
     List<JobTemplate> jobTemplates = flowTemplate.getJobTemplates();
     Assert.assertEquals(jobTemplates.size(), 4);
-    for(int i=0; i<4; i++) {
+    for (int i = 0; i < 4; i++) {
       String uri = new Path(jobTemplates.get(i).getUri()).getName().split("\\.")[0];
       String templateId = uri.substring(uri.length() - 1);
-      for(int j=0; j<2; j++) {
+      for (int j = 0; j < 2; j++) {
         Config jobTemplateConfig = jobTemplates.get(i).getRawTemplateConfig();
-        String suffix = templateId + Integer.toString(j+1);
+        String suffix = templateId + Integer.toString(j + 1);
         Assert.assertEquals(jobTemplateConfig.getString("key" + suffix), "val" + suffix);
       }
     }
 
-    List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = flowTemplate.getInputOutputDatasetDescriptors();
+    Config flowConfig = ConfigFactory.empty().withValue("team.name", ConfigValueFactory.fromAnyRef("test-team"))
+        .withValue("dataset.name", ConfigValueFactory.fromAnyRef("test-dataset"));
+
+    List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = flowTemplate.getResolvingDatasetDescriptors(flowConfig);
     Assert.assertTrue(inputOutputDescriptors.size() == 2);
     List<String> dirs = Lists.newArrayList("inbound", "outbound");
-    for(int i=0; i<2; i++) {
-      for (int j=0; j<2; j++) {
-        HdfsDatasetDescriptor datasetDescriptor;
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        FSDatasetDescriptor datasetDescriptor;
         if (j == 0) {
-          datasetDescriptor = (HdfsDatasetDescriptor) inputOutputDescriptors.get(i).getLeft();
+          datasetDescriptor = (FSDatasetDescriptor) inputOutputDescriptors.get(i).getLeft();
         } else {
-          datasetDescriptor = (HdfsDatasetDescriptor) inputOutputDescriptors.get(i).getRight();
+          datasetDescriptor = (FSDatasetDescriptor) inputOutputDescriptors.get(i).getRight();
         }
         Assert.assertEquals(datasetDescriptor.getPlatform(), "hdfs");
-        Assert.assertEquals(datasetDescriptor.getFormat(), "avro");
-        Assert.assertEquals(datasetDescriptor.getPath(), "/data/" + dirs.get(i) + "/<TEAM_NAME>/<DATASET_NAME>");
+        Assert.assertEquals(datasetDescriptor.getFormatConfig().getFormat(), "avro");
+        Assert.assertEquals(datasetDescriptor.getPath(), "/data/" + dirs.get(i) + "/test-team/test-dataset");
       }
     }
     Config flowTemplateConfig = flowTemplate.getRawTemplateConfig();
-    Assert.assertEquals(flowTemplateConfig.getString("gobblin.flow.dataset.descriptor.input.0.class"), "org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor");
-    Assert.assertEquals(flowTemplateConfig.getString("gobblin.flow.dataset.descriptor.output.0.class"), "org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor");
+    Assert.assertEquals(flowTemplateConfig.getString(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX + ".0."
+        + DatasetDescriptorConfigKeys.CLASS_KEY), FSDatasetDescriptor.class.getCanonicalName());
+    Assert.assertEquals(flowTemplateConfig.getString(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX
+        + ".0." + DatasetDescriptorConfigKeys.CLASS_KEY), FSDatasetDescriptor.class.getCanonicalName());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flow/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flow/flow.conf b/gobblin-service/src/test/resources/flow/flow.conf
new file mode 100644
index 0000000..f818df6
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow.conf
@@ -0,0 +1,24 @@
+team.name=testTeam
+dataset.name=testDataset
+user.to.proxy=testUser
+adls.user.to.proxy=adlsTestUser
+adls.oauth2.client.id=1234
+adls.ouath2.credential=credential
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.input.dataset.descriptor.format=avro
+gobblin.flow.input.dataset.descriptor.codec=NONE
+gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE
+
+#Output dataset - compressed and encrypted
+gobblin.flow.output.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.output.dataset.descriptor.platform=adls
+gobblin.flow.output.dataset.descriptor.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.output.dataset.descriptor.format=json
+gobblin.flow.output.dataset.descriptor.codec=gzip
+gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
new file mode 100644
index 0000000..a219e4f
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=ADLS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.AdlsDataNode
+data.node.fs.uri=adl://azuredatalakestore.net/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
new file mode 100644
index 0000000..cad5e03
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn01.grid.linkedin.com:8888/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
new file mode 100644
index 0000000..eeb7980
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-2
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn02.grid.linkedin.com:8888/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
new file mode 100644
index 0000000..61135ba
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-3
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn03.grid.linkedin.com:8888/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
new file mode 100644
index 0000000..a772f1c
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-4
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn04.grid.linkedin.com:8888/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
new file mode 100644
index 0000000..6683221
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=LocalFS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.LocalFSDataNode
+data.node.fs.uri=file:///
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
new file mode 100644
index 0000000..bcf6d44
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-1
+flow.edge.destination=HDFS-1
+flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
new file mode 100644
index 0000000..99d1ed7
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
@@ -0,0 +1,10 @@
+flow.edge.source=HDFS-1
+flow.edge.destination=HDFS-3
+flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
new file mode 100644
index 0000000..537cbfa
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-2
+flow.edge.destination=HDFS-2
+flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
new file mode 100644
index 0000000..6ec2ea5
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-2
+flow.edge.destination=HDFS-4
+flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
new file mode 100644
index 0000000..ed6e899
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
@@ -0,0 +1,13 @@
+flow.edge.source=HDFS-3
+flow.edge.destination=ADLS-1
+flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+
+# Proxy config
+flow.edge.proxy.host=adl-proxy.linkedin.com
+flow.edge.proxy.port=1234
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
new file mode 100644
index 0000000..eae2767
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
@@ -0,0 +1,13 @@
+flow.edge.source=HDFS-4
+flow.edge.destination=ADLS-1
+flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+
+# Proxy config
+flow.edge.proxy.host=adl-proxy.linkedin.com
+flow.edge.proxy.port=1234

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
new file mode 100644
index 0000000..268b67f
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
@@ -0,0 +1,9 @@
+flow.edge.source=LocalFS-1
+flow.edge.destination=HDFS-1
+flow.edge.id=LocalFS-1:HDFS-1:localToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=fs:///
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
+flow.edge.specExecutors.0.specExecInstance.job.type=java

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
new file mode 100644
index 0000000..bc67810
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
@@ -0,0 +1,9 @@
+flow.edge.source=LocalFS-1
+flow.edge.destination=HDFS-2
+flow.edge.id=LocalFS-1:HDFS-2:localToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=fs:///
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
+flow.edge.specExecutors.0.specExecInstance.job.type=java
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
deleted file mode 100644
index 43fa9a3..0000000
--- a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.core;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SystemUtils;
-import org.eclipse.jgit.api.Git;
-import org.eclipse.jgit.api.errors.GitAPIException;
-import org.eclipse.jgit.dircache.DirCache;
-import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.lib.RepositoryCache;
-import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.util.FS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-
-
-public class GitFlowGraphMonitorTest {
-  private static final Logger logger = LoggerFactory.getLogger(GitFlowGraphMonitor.class);
-  private Repository remoteRepo;
-  private Git gitForPush;
-  private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir";
-  private final File remoteDir = new File(TEST_DIR + "/remote");
-  private final File cloneDir = new File(TEST_DIR + "/clone");
-  private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
-  private static final String NODE_1_FILE = "node1.properties";
-  private final File node1Dir = new File(flowGraphDir, "node1");
-  private final File node1File = new File(node1Dir, NODE_1_FILE);
-  private static final String NODE_2_FILE = "node2.properties";
-  private final File node2Dir = new File(flowGraphDir, "node2");
-  private final File node2File = new File(node2Dir, NODE_2_FILE);
-  private final File edge1Dir = new File(node1Dir, "node2");
-  private final File edge1File = new File(edge1Dir, "edge1.properties");
-
-  private RefSpec masterRefSpec = new RefSpec("master");
-  private FSFlowCatalog flowCatalog;
-  private Config config;
-  private BaseFlowGraph flowGraph;
-  private GitFlowGraphMonitor gitFlowGraphMonitor;
-
-  @BeforeClass
-  public void setUp() throws Exception {
-    cleanUpDir(TEST_DIR);
-
-    // Create a bare repository
-    RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED);
-    this.remoteRepo = fileKey.open(false);
-    this.remoteRepo.create(true);
-
-    this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
-
-    // push an empty commit as a base for detecting changes
-    this.gitForPush.commit().setMessage("First commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.config = ConfigBuilder.create()
-        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
-            + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
-        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph")
-        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
-        .build();
-
-    // Create a FSFlowCatalog instance
-    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-    Properties properties = new Properties();
-    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
-    Config config = ConfigFactory.parseProperties(properties);
-    Config templateCatalogCfg = config
-        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
-
-    //Create a FlowGraph instance with defaults
-    this.flowGraph = new BaseFlowGraph();
-
-    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph);
-    this.gitFlowGraphMonitor.setActive(true);
-  }
-
-  private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, String paramValue)
-      throws IOException, GitAPIException {
-    // push a new node file
-    nodeDir.mkdirs();
-    nodeFile.createNewFile();
-    Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8);
-
-    // add, commit, push node
-    this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call();
-    this.gitForPush.commit().setMessage("Node commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.gitFlowGraphMonitor.processGitConfigChanges();
-
-    //Check if node1 has been added to the FlowGraph
-    DataNode dataNode = this.flowGraph.getNode(nodeId);
-    Assert.assertEquals(dataNode.getId(), nodeId);
-    Assert.assertTrue(dataNode.isActive());
-    Assert.assertEquals(dataNode.getProps().getString("param1"), paramValue);
-  }
-
-  @Test
-  public void testAddNode()
-      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
-    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1");
-    testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2");
-  }
-
-  @Test (dependsOnMethods = "testAddNode")
-  public void testAddEdge()
-      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
-    // push a new node file
-    this.edge1Dir.mkdirs();
-    this.edge1File.createNewFile();
-
-    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8);
-
-    // add, commit, push
-    this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
-    this.gitForPush.commit().setMessage("Edge commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.gitFlowGraphMonitor.processGitConfigChanges();
-
-    //Check if edge1 has been added to the FlowGraph
-    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
-    Assert.assertEquals(edgeSet.size(), 1);
-    FlowEdge flowEdge = edgeSet.iterator().next();
-    Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
-    Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
-  }
-
-  @Test (dependsOnMethods = "testAddNode")
-  public void testUpdateEdge()
-      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
-    //Update edge1 file
-    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n"
-        + "key1=value1\n", edge1File, Charsets.UTF_8);
-
-    // add, commit, push
-    this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
-    this.gitForPush.commit().setMessage("Edge commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.gitFlowGraphMonitor.processGitConfigChanges();
-
-    //Check if new edge1 has been added to the FlowGraph
-    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
-    Assert.assertEquals(edgeSet.size(), 1);
-    FlowEdge flowEdge = edgeSet.iterator().next();
-    Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
-    Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
-    Assert.assertEquals(flowEdge.getProps().getString("key1"), "value1");
-  }
-
-  @Test (dependsOnMethods = "testUpdateEdge")
-  public void testUpdateNode()
-      throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
-    //Update param1 value in node1 and check if updated node is added to the graph
-    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3");
-  }
-
-
-  @Test (dependsOnMethods = "testUpdateNode")
-  public void testRemoveEdge() throws GitAPIException, IOException {
-    // delete a config file
-    edge1File.delete();
-
-    //Node1 has 1 edge before delete
-    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
-    Assert.assertEquals(edgeSet.size(), 1);
-
-    // delete, commit, push
-    DirCache ac = this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
-        this.edge1Dir.getName(), this.edge1File.getName())).call();
-    RevCommit cc = this.gitForPush.commit().setMessage("Edge remove commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.gitFlowGraphMonitor.processGitConfigChanges();
-
-    //Check if edge1 has been deleted from the graph
-    edgeSet = this.flowGraph.getEdges("node1");
-    Assert.assertTrue(edgeSet.size() == 0);
-  }
-
-  @Test (dependsOnMethods = "testRemoveEdge")
-  public void testRemoveNode() throws GitAPIException, IOException {
-    //delete node file
-    node1File.delete();
-
-    //node1 is present in the graph before delete
-    DataNode node1 = this.flowGraph.getNode("node1");
-    Assert.assertNotNull(node1);
-
-    // delete, commit, push
-    DirCache ac = this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call();
-    RevCommit cc = this.gitForPush.commit().setMessage("Node remove commit").call();
-    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
-    this.gitFlowGraphMonitor.processGitConfigChanges();
-
-    //Check if node1 has been deleted from the graph
-    node1 = this.flowGraph.getNode("node1");
-    Assert.assertNull(node1);
-  }
-
-
-  private void cleanUpDir(String dir) {
-    File specStoreDir = new File(dir);
-
-    // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
-    for (int i = 0; i < 5; i++) {
-      try {
-        if (specStoreDir.exists()) {
-          FileUtils.deleteDirectory(specStoreDir);
-        }
-        // if delete succeeded then break out of loop
-        break;
-      } catch (IOException e) {
-        logger.warn("Cleanup delete directory failed for directory: " + dir, e);
-      }
-    }
-  }
-
-  private String formNodeFilePath(String groupDir, String fileName) {
-    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
-  }
-
-  private String formEdgeFilePath(String parentDir, String groupDir, String fileName) {
-    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
-  }
-
-  @AfterClass
-  public void tearDown() throws Exception {
-    cleanUpDir(TEST_DIR);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
deleted file mode 100644
index 9dd51a0..0000000
--- a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-import java.util.Properties;
-
-import org.apache.gobblin.util.ConfigUtils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class BaseFlowEdgeFactoryTest {
-  @Test
-  public void testCreateFlowEdge() throws Exception {
-    Properties properties = new Properties();
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"node1");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "node2");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "FS:///test-template/flow.conf");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2");
-    properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2");
-
-    FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory();
-
-    Properties props = new Properties();
-    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-    props.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
-    Config config = ConfigFactory.parseProperties(props);
-    Config templateCatalogCfg = config
-        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-            config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
-    Config edgeProps = ConfigUtils.propertiesToConfig(properties);
-    FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog);
-    Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
-    Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),"s1:d1");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),"/tmp2");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),"s2:d2");
-    Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(),"InMemorySpecExecutor");
-    Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(),"InMemorySpecExecutor");
-  }
-}
\ No newline at end of file