You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/30 16:57:37 UTC
[1/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow
Compiler for Gobblin-as-a-Service (GaaS).
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 33d4fea4b -> 22a951f0a
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/flow.conf b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/flow.conf
new file mode 100644
index 0000000..64d6921
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/flow.conf
@@ -0,0 +1,20 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/inbound/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=avro
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
+
+gobblin.flow.edge.input.dataset.descriptor.1.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.1.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.1.path=/data/outbound/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.1.format=avro
+
+gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class}
+gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform}
+gobblin.flow.edge.output.dataset.descriptor.1.path=${gobblin.flow.edge.input.dataset.descriptor.1.path}
+gobblin.flow.edge.output.dataset.descriptor.1.format=${gobblin.flow.edge.input.dataset.descriptor.1.format}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
new file mode 100644
index 0000000..0d4f7c3
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job1.job
@@ -0,0 +1 @@
+gobblin.template.uri="resource:///template_catalog/templates/job1.template"
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
new file mode 100644
index 0000000..c26ade4
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job2.job
@@ -0,0 +1,3 @@
+gobblin.template.uri="resource:///template_catalog/templates/job2.template"
+
+dependencies=job1
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
new file mode 100644
index 0000000..cac20ed
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job3.job
@@ -0,0 +1,2 @@
+gobblin.template.uri="resource:///template_catalog/templates/job3.template"
+dependencies=job1
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
new file mode 100644
index 0000000..9b86c77
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/flowEdgeTemplate/jobs/job4.job
@@ -0,0 +1,2 @@
+gobblin.template.uri="resource:///template_catalog/templates/job4.template"
+dependencies="job2,job3"
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
new file mode 100644
index 0000000..0a53e5b
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
@@ -0,0 +1,18 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=avro
+#############################################################
+# Define input dataset to be uncompressed and unencrypted
+#############################################################
+gobblin.flow.edge.output.dataset.descriptor.0.codec=NONE
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.algorithm=NONE
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.edge.output.dataset.descriptor.0.format=json
+gobblin.flow.edge.output.dataset.descriptor.0.codec=gzip
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.algorithm=aes_rotating
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_type=json
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_encoding=base64
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
new file mode 100644
index 0000000..cda75cf
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job
@@ -0,0 +1 @@
+gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template"
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
new file mode 100644
index 0000000..2cbf420
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
@@ -0,0 +1,18 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=json
+gobblin.flow.edge.input.dataset.descriptor.0.codec=gzip
+gobblin.flow.edge.input.dataset.descriptor.0.encrypt.algorithm=aes_rotating
+gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_type=json
+gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_encoding=base64
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.output.dataset.descriptor.0.platform=adls
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
+gobblin.flow.edge.output.dataset.descriptor.0.codec=${gobblin.flow.edge.input.dataset.descriptor.0.codec}
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.algorithm=${gobblin.flow.edge.input.dataset.descriptor.0.encrypt.algorithm}
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_type=${gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_type}
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_encoding=${gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_encoding}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
new file mode 100644
index 0000000..37d0d9c
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job
@@ -0,0 +1 @@
+gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
new file mode 100644
index 0000000..abac6b5
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
@@ -0,0 +1,15 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+
+gobblin.flow.edge.input.dataset.descriptor.1.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.1.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.1.path=/data/encrypted/${team.name}/${dataset.name}
+
+gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class}
+gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform}
+gobblin.flow.edge.output.dataset.descriptor.1.path=${gobblin.flow.edge.input.dataset.descriptor.1.path}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
new file mode 100644
index 0000000..fe627c9
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job
@@ -0,0 +1 @@
+gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp.template"
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
new file mode 100644
index 0000000..d0765e1
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
@@ -0,0 +1,9 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=avro
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
new file mode 100644
index 0000000..fe627c9
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job
@@ -0,0 +1 @@
+gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/distcp.template"
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
new file mode 100644
index 0000000..b92a2bf
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template
@@ -0,0 +1,65 @@
+# ====================================================================
+# Job configurations
+# ====================================================================
+job.name=Distcp-HDFS-ADL
+
+#team.name and dataset.name to be supplied via flowConfig
+from=/data/encrypted/${team.name}/${dataset.name}
+to=/data/encrypted/${team.name}/${dataset.name}
+
+#Will delete files in target(ADL) if not exist in source
+gobblin.copy.recursive.update=true
+gobblin.copy.recursive.delete=true
+gobblin.copy.recursive.deleteEmptyDirectories=true
+gobblin.trash.skip.trash=true
+
+#Will make the job fail if there's any failure
+gobblin.copy.abortOnSingleDatasetFailure=true
+
+#gobblin.copy.preserved.attributes=p
+
+#Job properties to be resolved from source and dest data node config.
+fs.uri=${source.data.node.fs.uri}
+source.filebased.fs.uri=${fs.uri}
+state.store.fs.uri=${fs.uri}
+target.filebased.fs.uri=${destination.data.node.fs.uri}
+writer.fs.uri=${target.filebased.fs.uri}
+
+#ADL parameters
+fs.AbstractFileSystem.adl.impl="org.apache.hadoop.fs.adl.Adl"
+dfs.adls.oauth2.access.token.provider.type=ClientCredential
+dfs.adls.oauth2.refresh.url="https://login.microsoftonline.com/67893-erty-1234-7678-123456/oauth2/token"
+dfs.adls.oauth2.client.id=${adls.oauth2.client.id}
+writer.encrypted.dfs.adls.oauth2.credential=${adls.ouath2.credential}
+
+encrypt.key.loc=/user/${user.to.proxy}/master.password
+work.dir=/tmp/${user.to.proxy}
+writer.user.to.proxy=${adls.user.to.proxy}
+
+# ====================================================================
+# Distcp configurations
+# ====================================================================
+extract.namespace="gobblin.copy"
+
+gobblin.dataset.profile.class="org.apache.gobblin.data.management.copy.CopyableGlobDatasetFinder"
+
+# target location for copy
+data.publisher.final.dir=${to}
+gobblin.dataset.pattern=${from}
+
+data.publisher.type="org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher"
+source.class="org.apache.gobblin.data.management.copy.CopySource"
+writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder"
+converter.classes="org.apache.gobblin.converter.IdentityConverter"
+
+# =======================================
+# Job Parameters to be resolved using SpecExecutor properties
+# =======================================
+type=${specExecInstance.job.type}
+
+job.jars="lib/*"
+job.lock.enabled=false
+job.class=${specExecInstance.job.launcher.class}
+
+# Gobblin Hadoop Parameters
+launcher.type=${specExecInstance.job.launcher.type}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
new file mode 100644
index 0000000..844dc92
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
@@ -0,0 +1,57 @@
+# ====================================================================
+# Job configurations
+# ====================================================================
+job.name=Distcp-HDFS-HDFS
+
+# Source and destination paths to be obtained from flow config.
+from=${gobblin.flow.edge.input.dataset.descriptor.path}
+to=${gobblin.flow.edge.output.dataset.descriptor.path}
+
+#Will delete files in target if not exist in source
+gobblin.copy.recursive.update=true
+gobblin.copy.recursive.delete=true
+gobblin.copy.recursive.deleteEmptyDirectories=true
+gobblin.trash.skip.trash=true
+
+#Will make the job fail if there's any failure
+gobblin.copy.abortOnSingleDatasetFailure=true
+
+#gobblin.copy.preserved.attributes=p
+
+#Job properties to be resolved from source and dest data node config.
+fs.uri=${source.data.node.fs.uri}
+source.filebased.fs.uri=${fs.uri}
+state.store.fs.uri=${fs.uri}
+target.filebased.fs.uri=${destination.data.node.fs.uri}
+writer.fs.uri=${target.filebased.fs.uri}
+
+work.dir=/tmp/${user.to.proxy}
+writer.user.to.proxy=${adls.user.to.proxy}
+
+# ====================================================================
+# Distcp configurations
+# ====================================================================
+extract.namespace="gobblin.copy"
+
+gobblin.dataset.profile.class="org.apache.gobblin.data.management.copy.CopyableGlobDatasetFinder"
+
+# target location for copy
+data.publisher.final.dir=${to}
+gobblin.dataset.pattern=${from}
+
+data.publisher.type="org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher"
+source.class="org.apache.gobblin.data.management.copy.CopySource"
+writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder"
+converter.classes="org.apache.gobblin.converter.IdentityConverter"
+
+# =======================================
+# Job Parameters to be resolved using SpecExecutor properties
+# =======================================
+type=${specExecInstance.job.type}
+
+job.jars="lib/*"
+job.lock.enabled=false
+job.class=${specExecInstance.job.launcher.class}
+
+# Gobblin Hadoop Parameters
+launcher.type=${specExecInstance.job.launcher.type}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template
new file mode 100644
index 0000000..fcc78cd
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template
@@ -0,0 +1,42 @@
+# ====================================================================
+# Job configurations (can be changed)
+# ====================================================================
+job.name=convert-to-json-and-encrypt
+job.description="Convert date partitioned avro files to json and encrypt"
+from=/data/out/${team.name}/${dataset.name}
+to=/data/encrypted/${team.name}/${dataset.name}
+
+# ====================================================================
+# Distcp configurations
+# ====================================================================
+
+source.class="org.apache.gobblin.source.DatePartitionedAvroFileSource"
+date.partitioned.source.partition.pattern=yyyy-MM-dd
+date.partitioned.source.min.watermark.value=2017-03-01
+source.filebased.data.directory=${from}
+source.entity=avro
+
+converter.classes="org.apache.gobblin.converter.avro.AvroToJsonStringConverter,org.apache.gobblin.converter.string.StringToBytesConverter"
+
+writer.builder.class="org.apache.gobblin.writer.SimpleDataWriterBuilder"
+writer.output.format=json
+writer.codec.type=gzip
+simple.writer.prepend.size=false
+writer.partitioner.class="org.apache.gobblin.writer.partitioner.WorkUnitStateWriterPartitioner"
+writer.partition.pattern=${date.partitioned.source.partition.pattern}
+
+writer.encrypt.algorithm=aes_rotating
+writer.encrypt.keystore_type=json
+writer.encrypt.keystore_path="hdfs://path/to/keystore/keystore.json"
+
+data.publisher.type="org.apache.gobblin.publisher.BaseDataPublisher"
+data.publisher.appendExtractToFinalDir=false
+data.publisher.metadata.output_file="metadata.json"
+data.publisher.metadata.publish.writer=true
+
+data.publisher.final.dir=${to}
+
+task.maxretries=0
+workunit.retry.enabled=false
+
+qualitychecker.task.policies="org.apache.gobblin.policies.count.RowCountPolicy"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/templates/job1.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job1.template b/gobblin-service/src/test/resources/template_catalog/templates/job1.template
new file mode 100644
index 0000000..321e984
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/templates/job1.template
@@ -0,0 +1,2 @@
+key11=val11
+key12=val12
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/templates/job2.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job2.template b/gobblin-service/src/test/resources/template_catalog/templates/job2.template
new file mode 100644
index 0000000..5141d92
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/templates/job2.template
@@ -0,0 +1,2 @@
+key21=val21
+key22=val22
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/templates/job3.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job3.template b/gobblin-service/src/test/resources/template_catalog/templates/job3.template
new file mode 100644
index 0000000..c192cc4
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/templates/job3.template
@@ -0,0 +1,2 @@
+key31=val31
+key32=val32
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/templates/job4.template
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/templates/job4.template b/gobblin-service/src/test/resources/template_catalog/templates/job4.template
new file mode 100644
index 0000000..a6a508e
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/templates/job4.template
@@ -0,0 +1,2 @@
+key41=val41
+key42=val42
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/flow.conf b/gobblin-service/src/test/resources/template_catalog/test-template/flow.conf
index a13af7d..85e686b 100644
--- a/gobblin-service/src/test/resources/template_catalog/test-template/flow.conf
+++ b/gobblin-service/src/test/resources/template_catalog/test-template/flow.conf
@@ -1,16 +1,22 @@
-gobblin.flow.dataset.descriptor.input.0.class=org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor
-gobblin.flow.dataset.descriptor.input.0.path=/data/inbound/<TEAM_NAME>/<DATASET_NAME>
-gobblin.flow.dataset.descriptor.input.0.format=avro
+team.name=test-team
+dataset.name=test-dataset
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/inbound/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=avro
-gobblin.flow.dataset.descriptor.output.0.class=${gobblin.flow.dataset.descriptor.input.0.class}
-gobblin.flow.dataset.descriptor.output.0.path=${gobblin.flow.dataset.descriptor.input.0.path}
-gobblin.flow.dataset.descriptor.output.0.format=${gobblin.flow.dataset.descriptor.input.0.format}
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
-gobblin.flow.dataset.descriptor.input.1.class=org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor
-gobblin.flow.dataset.descriptor.input.1.path=/data/outbound/<TEAM_NAME>/<DATASET_NAME>
-gobblin.flow.dataset.descriptor.input.1.format=avro
+gobblin.flow.edge.input.dataset.descriptor.1.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.1.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.1.path=/data/outbound/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.1.format=avro
-gobblin.flow.dataset.descriptor.output.1.class=${gobblin.flow.dataset.descriptor.input.1.class}
-gobblin.flow.dataset.descriptor.output.1.path=${gobblin.flow.dataset.descriptor.input.1.path}
-gobblin.flow.dataset.descriptor.output.1.format=${gobblin.flow.dataset.descriptor.input.1.format}
+gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class}
+gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform}
+gobblin.flow.edge.output.dataset.descriptor.1.path=${gobblin.flow.edge.input.dataset.descriptor.1.path}
+gobblin.flow.edge.output.dataset.descriptor.1.format=${gobblin.flow.edge.input.dataset.descriptor.1.format}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.conf b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.conf
deleted file mode 100644
index 4a59fcc..0000000
--- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.conf
+++ /dev/null
@@ -1,2 +0,0 @@
-key11=val11
-key12=val12
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.job
new file mode 100644
index 0000000..00e274e
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job1.job
@@ -0,0 +1 @@
+gobblin.template.uri=resource:///template_catalog/templates/job1.template
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.conf b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.conf
deleted file mode 100644
index f174940..0000000
--- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-dependencies=job1
-key21=val21
-key22=val22
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
new file mode 100644
index 0000000..c4db05f
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job2.job
@@ -0,0 +1,3 @@
+gobblin.template.uri=resource:///template_catalog/templates/job2.template
+
+dependencies=job1
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.conf b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.conf
deleted file mode 100644
index fda7f39..0000000
--- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-dependencies=job1
-key31=val31
-key32=val32
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
new file mode 100644
index 0000000..59867b3
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job3.job
@@ -0,0 +1,2 @@
+gobblin.template.uri=resource:///template_catalog/templates/job3.template
+dependencies=job1
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.conf b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.conf
deleted file mode 100644
index c5ef881..0000000
--- a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-dependencies="job2,job3"
-key41=val41
-key42=val42
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
new file mode 100644
index 0000000..8fdc611
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/test-template/jobs/job4.job
@@ -0,0 +1,2 @@
+gobblin.template.uri=resource:///template_catalog/templates/job4.template
+dependencies=job2,job3
[3/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow
Compiler for Gobblin-as-a-Service (GaaS).
Posted by hu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
new file mode 100644
index 0000000..c6f5c74
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+
+
+/**
+ * An implementation of an ADL (Azure Data Lake) {@link org.apache.gobblin.service.modules.flowgraph.DataNode}.
+ */
+public class AdlsDataNode extends FileSystemDataNode {
+ public static final String ADLS_SCHEME = "adl";
+
+ public AdlsDataNode(Config nodeProps) throws DataNodeCreationException {
+ super(nodeProps);
+ }
+
+ /**
+ * @param fsUri FileSystem URI
+ * @return true if the scheme is "adl" and authority is not empty.
+ */
+ @Override
+ public boolean isUriValid(URI fsUri) {
+ String scheme = fsUri.getScheme();
+ //Check that the scheme is "adl"
+ if (!scheme.equals(ADLS_SCHEME)) {
+ return false;
+ }
+ //Ensure that the authority is not empty
+ if (Strings.isNullOrEmpty(fsUri.getAuthority())) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
new file mode 100644
index 0000000..72f1a66
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+import joptsimple.internal.Strings;
+import lombok.Getter;
+
+
+/**
+ * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
+ * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
+ */
+@Alpha
+public abstract class FileSystemDataNode extends BaseDataNode {
+ public static final String FS_URI_KEY = FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "fs.uri";
+
+ @Getter
+ private String fsUri;
+
+ /**
+ * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id.
+ */
+ public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException {
+ super(nodeProps);
+ try {
+ this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, "");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "fs.uri cannot be null or empty.");
+
+ //Validate the srcFsUri and destFsUri of the DataNode.
+ if (!isUriValid(new URI(this.fsUri))) {
+ throw new IOException("Invalid FS URI " + this.fsUri);
+ }
+ } catch (Exception e) {
+ throw new DataNodeCreationException(e);
+ }
+ }
+
+ public abstract boolean isUriValid(URI fsUri);
+ /**
+ * Two HDFS DataNodes are the same if they have the same id and the same fsUri.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FileSystemDataNode that = (FileSystemDataNode) o;
+
+ return this.getId().equals(that.getId()) && this.fsUri.equals(that.getFsUri());
+ }
+
+ @Override
+ public int hashCode() {
+ return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
new file mode 100644
index 0000000..5402074
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri)
+ * are validated here.
+ */
+@Alpha
+public class HdfsDataNode extends FileSystemDataNode {
+ public static final String HDFS_SCHEME = "hdfs";
+
+ public HdfsDataNode(Config nodeProps) throws DataNodeCreationException {
+ super(nodeProps);
+ }
+
+ /**
+ *
+ * @param fsUri FileSystem URI
+ * @return true if the scheme is "hdfs" and authority is not empty.
+ */
+ @Override
+ public boolean isUriValid(URI fsUri) {
+ String scheme = fsUri.getScheme();
+ //Check that the scheme is "hdfs"
+ if (!scheme.equals(HDFS_SCHEME)) {
+ return false;
+ }
+ //Ensure that the authority is not empty
+ if (Strings.isNullOrEmpty(fsUri.getAuthority())) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
new file mode 100644
index 0000000..757d4a0
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import com.typesafe.config.Config;
+
+/**
+ * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri)
+ * are validated here.
+ */
+@Alpha
+public class LocalFSDataNode extends FileSystemDataNode {
+ public static final String LOCAL_FS_SCHEME = "file";
+
+ public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException {
+ super(nodeProps);
+ }
+
+ /**
+ *
+ * @param fsUri FileSystem URI
+ * @return true if the scheme of fsUri equals "file"
+ */
+ @Override
+ public boolean isUriValid(URI fsUri) {
+ String scheme = fsUri.getScheme();
+ if (scheme.equals(LOCAL_FS_SCHEME)) {
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
new file mode 100644
index 0000000..c0c9297
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A data class that encapsulates information for executing a job. This includes a {@link JobSpec} and a {@link SpecExecutor}
+ * where the {@link JobSpec} will be executed.
+ */
+@Data
+@AllArgsConstructor
+public class JobExecutionPlan {
+ private JobSpec jobSpec;
+ private SpecExecutor specExecutor;
+
+ public static class Factory {
+
+ public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, SpecExecutor specExecutor, Long flowExecutionId)
+ throws URISyntaxException {
+ JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId);
+ return new JobExecutionPlan(jobSpec, specExecutor);
+ }
+
+ /**
+ * Given a resolved job config, this helper method converts the config to a {@link JobSpec}.
+ * @param jobConfig resolved job config.
+ * @param flowSpec input FlowSpec.
+ * @return a {@link JobSpec} corresponding to the resolved job config.
+ */
+ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long flowExecutionId) throws URISyntaxException {
+ Config flowConfig = flowSpec.getConfig();
+
+ String flowName = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_NAME_KEY, "");
+ String flowGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, "");
+
+ //Modify the job name to include the flow group:flow name.
+ jobName = Joiner.on(":").join(flowGroup, flowName, jobName);
+
+ JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, flowSpec)).withConfig(jobConfig)
+ .withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
+
+ //Get job template uri
+ URI jobTemplateUri = new URI(jobConfig.getString(ConfigurationKeys.JOB_TEMPLATE_PATH));
+ JobSpec jobSpec = jobSpecBuilder.withTemplate(jobTemplateUri).build();
+
+ //Add flowName to job spec
+ jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)));
+
+ //Add job name
+ jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef(jobName)));
+
+ //Add flow execution id
+ jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId)));
+
+ // Remove schedule
+ jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
+
+ // Add job.name and job.group
+ jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef(jobName)));
+ jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)));
+
+ //Enable job lock for each job to prevent concurrent executions.
+ jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_LOCK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true)));
+
+ // Reset properties in Spec from Config
+ jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));
+
+ return jobSpec;
+ }
+
+
+ /**
+ * A naive implementation of generating a jobSpec's URI within a multi-hop flow that follows the convention:
+ * <JOB_CATALOG_SCHEME>/{@link ConfigurationKeys#JOB_GROUP_KEY}/{@link ConfigurationKeys#JOB_NAME_KEY}.
+ */
+ public static URI jobSpecURIGenerator(String jobGroup, String jobName, FlowSpec flowSpec)
+ throws URISyntaxException {
+ return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME, flowSpec.getUri().getAuthority(),
+ StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(), "/"), "/") + jobGroup
+ + "/" + jobName, null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
new file mode 100644
index 0000000..f942f8d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+
+
+/**
+ * A Factory class used for constructing a {@link Dag} of {@link JobExecutionPlan}s from
+ * a {@link List} of {@link JobExecutionPlan}s.
+ */
+@Alpha
+@Slf4j
+public class JobExecutionPlanDagFactory {
+
+ public Dag<JobExecutionPlan> createDag(List<JobExecutionPlan> jobExecutionPlans) {
+ //Maintain a mapping between job name and the corresponding JobExecutionPlan.
+ Map<String, Dag.DagNode<JobExecutionPlan>> JobExecutionPlanMap = new HashMap<>();
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+ /**
+ * Create a {@link Dag.DagNode<JobExecutionPlan>} for every {@link JobSpec} in the flow. Add this node
+ * to a HashMap.
+ */
+ for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+ Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
+ dagNodeList.add(dagNode);
+ String jobName = getJobName(jobExecutionPlan);
+ if (jobName != null) {
+ JobExecutionPlanMap.put(jobName, dagNode);
+ }
+ }
+
+ /**
+ * Iterate over each {@link JobSpec} to get the dependencies of each {@link JobSpec}.
+ * For each {@link JobSpec}, get the corresponding {@link Dag.DagNode} and
+ * set the {@link Dag.DagNode}s corresponding to its dependencies as its parent nodes.
+ *
+ * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
+ */
+ for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+ String jobName = getJobName(jobExecutionPlan);
+ if (jobName == null) {
+ continue;
+ }
+ Dag.DagNode<JobExecutionPlan> node = JobExecutionPlanMap.get(jobName);
+ Collection<String> dependencies = getDependencies(jobExecutionPlan.getJobSpec().getConfig());
+ for (String dependency : dependencies) {
+ Dag.DagNode<JobExecutionPlan> parentNode = JobExecutionPlanMap.get(dependency);
+ node.addParentNode(parentNode);
+ }
+ }
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+ return dag;
+ }
+
+ /**
+ * Get job dependencies of a given job from its config.
+ * @param config of a job.
+ * @return a list of dependencies of the job.
+ */
+ private static List<String> getDependencies(Config config) {
+ return config.hasPath(ConfigurationKeys.JOB_DEPENDENCIES) ? Arrays
+ .asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new ArrayList<>();
+ }
+
+ /**
+ * The job name is derived from the {@link org.apache.gobblin.runtime.api.JobTemplate} URI. It is the
+ * simple name of the path component of the URI.
+ * @param jobExecutionPlan
+ * @return the simple name from the URI path.
+ */
+ private static String getJobName(JobExecutionPlan jobExecutionPlan) {
+ Optional<URI> jobTemplateUri = jobExecutionPlan.getJobSpec().getTemplateURI();
+ if (jobTemplateUri.isPresent()) {
+ return Files.getNameWithoutExtension(new Path(jobTemplateUri.get()).getName());
+ } else {
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
index 30d8309..a8350fd 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
@@ -20,16 +20,16 @@ package org.apache.gobblin.service.modules.template;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
import com.typesafe.config.Config;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
/**
* An interface primarily for representing a flow of {@link JobTemplate}s. It also has
@@ -45,18 +45,37 @@ public interface FlowTemplate extends Spec {
/**
*
- * @return the {@link Dag<JobTemplate>} that backs the {@link FlowTemplate}.
+ * @return all configuration inside pre-written template.
*/
- Dag<JobTemplate> getDag() throws IOException;
+ Config getRawTemplateConfig();
/**
- *
- * @return all configuration inside pre-written template.
+ * @param userConfig a list of user customized attributes.
+ * @return list of input/output {@link DatasetDescriptor}s that fully resolve the {@link FlowTemplate} using the
+ * provided userConfig.
*/
- Config getRawTemplateConfig();
+ List<Pair<DatasetDescriptor, DatasetDescriptor>> getResolvingDatasetDescriptors(Config userConfig)
+ throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException;
+
+ /**
+ * Checks if the {@link FlowTemplate} is resolvable using the provided {@link Config} object. A {@link FlowTemplate}
+ * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable
+ * @param userConfig User supplied Config
+ * @param inputDescriptor input {@link DatasetDescriptor}
+ * @param outputDescriptor output {@link DatasetDescriptor}
+ * @return true if the {@link FlowTemplate} is resolvable
+ */
+ boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
+ throws SpecNotFoundException, JobTemplate.TemplateException;
/**
- * @return list of input/output {@link DatasetDescriptor}s for the {@link FlowTemplate}.
+ * Resolves the {@link JobTemplate}s underlying this {@link FlowTemplate} and returns a {@link List} of resolved
+ * job {@link Config}s.
+ * @param userConfig User supplied Config
+ * @param inputDescriptor input {@link DatasetDescriptor}
+ * @param outputDescriptor output {@link DatasetDescriptor}
+ * @return a list of resolved job {@link Config}s.
*/
- List<Pair<DatasetDescriptor, DatasetDescriptor>> getInputOutputDatasetDescriptors();
+ List<Config> getResolvedJobConfigs(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
+ throws SpecNotFoundException, JobTemplate.TemplateException;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
index 553f067..ba9c091 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
+import java.net.URISyntaxException;
import com.google.common.base.Charsets;
import com.typesafe.config.Config;
@@ -43,15 +44,15 @@ public class HOCONInputStreamFlowTemplate extends StaticFlowTemplate {
public static final String VERSION_KEY = "gobblin.flow.template.version";
private static final String DEFAULT_VERSION = "1";
- public HOCONInputStreamFlowTemplate(InputStream inputStream, URI uri, FlowCatalogWithTemplates catalog)
- throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException {
+ public HOCONInputStreamFlowTemplate(InputStream inputStream, URI flowTemplateDirUri, FlowCatalogWithTemplates catalog)
+ throws SpecNotFoundException, IOException, JobTemplate.TemplateException, URISyntaxException {
this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)).resolve(
- ConfigResolveOptions.defaults().setAllowUnresolved(true)), uri, catalog);
+ ConfigResolveOptions.defaults().setAllowUnresolved(true)), flowTemplateDirUri, catalog);
}
- public HOCONInputStreamFlowTemplate(Config config, URI uri, FlowCatalogWithTemplates catalog)
- throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException {
- super(uri, ConfigUtils.getString(config, VERSION_KEY, DEFAULT_VERSION),
+ public HOCONInputStreamFlowTemplate(Config config, URI flowTemplateDirUri, FlowCatalogWithTemplates catalog)
+ throws SpecNotFoundException, IOException, JobTemplate.TemplateException, URISyntaxException {
+ super(flowTemplateDirUri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION,
config.hasPath(ConfigurationKeys.FLOW_DESCRIPTION_KEY) ? config
.getString(ConfigurationKeys.FLOW_DESCRIPTION_KEY) : "", config, catalog);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
deleted file mode 100644
index b89da4d..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.template;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-/**
- * A Factory class used for constructing a {@link Dag} of {@link org.apache.gobblin.runtime.api.JobTemplate}s from
- * a {@link URI} of a {@link FlowTemplate}.
- */
-@Alpha
-@Slf4j
-public class JobTemplateDagFactory {
- public static final String JOB_TEMPLATE_FILE_SUFFIX = ".conf";
-
- public static Dag<JobTemplate> createDagFromJobTemplates(List<JobTemplate> jobTemplates) {
- Map<URI, Dag.DagNode<JobTemplate>> uriJobTemplateMap = new HashMap<>();
- List<Dag.DagNode<JobTemplate>> dagNodeList = new ArrayList<>();
- /**
- * Create a {@link Dag.DagNode<JobTemplate>} for every {@link JobTemplate} in the flow. Add this node
- * to a {@link Map<URI,JobTemplate>}.
- */
- for (JobTemplate template : jobTemplates) {
- Dag.DagNode<JobTemplate> dagNode = new Dag.DagNode<>(template);
- dagNodeList.add(dagNode);
- uriJobTemplateMap.put(template.getUri(), dagNode);
- }
-
- /**
- * Iterate over each {@link JobTemplate} to get the dependencies of each {@link JobTemplate}.
- * For each {@link JobTemplate}, get the corresponding {@link Dag.DagNode} and
- * set the {@link Dag.DagNode}s corresponding to the dependencies as its parent nodes.
- *
- * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
- */
- Path templateDirPath = new Path(jobTemplates.get(0).getUri()).getParent();
- for (JobTemplate template : jobTemplates) {
- URI templateUri = template.getUri();
- Dag.DagNode<JobTemplate> node = uriJobTemplateMap.get(templateUri);
- Collection<String> dependencies = template.getDependencies();
- for (String dependency : dependencies) {
- URI dependencyUri = new Path(templateDirPath, dependency).suffix(JOB_TEMPLATE_FILE_SUFFIX).toUri();
- Dag.DagNode<JobTemplate> parentNode = uriJobTemplateMap.get(dependencyUri);
- node.addParentNode(parentNode);
- }
- }
- Dag<JobTemplate> dag = new Dag<>(dagNodeList);
- return dag;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index 46f99d3..5f8dfda 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -19,38 +19,40 @@ package org.apache.gobblin.service.modules.template;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigResolveOptions;
+import com.typesafe.config.ConfigValueFactory;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.apache.hadoop.fs.Path;
-
-import lombok.Getter;
/**
* A {@link FlowTemplate} using a static {@link Config} as the raw configuration for the template.
*/
@Alpha
+@Slf4j
public class StaticFlowTemplate implements FlowTemplate {
private static final long serialVersionUID = 84641624233978L;
- public static final String INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.input";
- public static final String OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.output";
- public static final String DATASET_DESCRIPTOR_CLASS_KEY = "class";
-
@Getter
private URI uri;
@Getter
@@ -60,69 +62,74 @@ public class StaticFlowTemplate implements FlowTemplate {
@Getter
private transient FlowCatalogWithTemplates catalog;
@Getter
- private List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors;
- @Getter
private List<JobTemplate> jobTemplates;
- private transient Dag<JobTemplate> dag;
-
private transient Config rawConfig;
- private boolean isTemplateMaterialized;
- public StaticFlowTemplate(URI uri, String version, String description, Config config,
+ public StaticFlowTemplate(URI flowTemplateDirUri, String version, String description, Config config,
FlowCatalogWithTemplates catalog)
- throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException {
- this.uri = uri;
+ throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+ this.uri = flowTemplateDirUri;
this.version = version;
this.description = description;
- this.inputOutputDatasetDescriptors = buildInputOutputDescriptors(config);
this.rawConfig = config;
this.catalog = catalog;
- URI flowTemplateDir = new Path(this.uri).getParent().toUri();
- this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDir);
+ this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDirUri);
}
//Constructor for testing purposes
- public StaticFlowTemplate(URI uri, String version, String description, Config config,
- FlowCatalogWithTemplates catalog, List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors, List<JobTemplate> jobTemplates)
- throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException {
+ public StaticFlowTemplate(URI uri, String version, String description, Config config, FlowCatalogWithTemplates catalog, List<JobTemplate> jobTemplates) {
this.uri = uri;
this.version = version;
this.description = description;
- this.inputOutputDatasetDescriptors = inputOutputDatasetDescriptors;
this.rawConfig = config;
this.catalog = catalog;
this.jobTemplates = jobTemplates;
}
+
/**
* Generate the input/output dataset descriptors for the {@link FlowTemplate}.
+ * @param userConfig
+ * @return a List of Input/Output DatasetDescriptors that resolve this {@link FlowTemplate}.
*/
- private List<Pair<DatasetDescriptor, DatasetDescriptor>> buildInputOutputDescriptors(Config config)
- throws IOException, ReflectiveOperationException {
- if (!config.hasPath(INPUT_DATASET_DESCRIPTOR_PREFIX) || !config.hasPath(OUTPUT_DATASET_DESCRIPTOR_PREFIX)) {
+ @Override
+ public List<Pair<DatasetDescriptor, DatasetDescriptor>> getResolvingDatasetDescriptors(Config userConfig)
+ throws IOException, SpecNotFoundException, JobTemplate.TemplateException {
+ Config config = this.getResolvedFlowConfig(userConfig).resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+ if (!config.hasPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX)
+ || !config.hasPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX)) {
throw new IOException("Flow template must specify at least one input/output dataset descriptor");
}
+
int i = 0;
- String inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
+ String inputPrefix = Joiner.on(".").join(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
List<Pair<DatasetDescriptor, DatasetDescriptor>> result = Lists.newArrayList();
while (config.hasPath(inputPrefix)) {
- Config inputDescriptorConfig = config.getConfig(inputPrefix);
- DatasetDescriptor inputDescriptor = getDatasetDescriptor(inputDescriptorConfig);
- String outputPrefix = Joiner.on(".").join(OUTPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i++));
- Config outputDescriptorConfig = config.getConfig(outputPrefix);
- DatasetDescriptor outputDescriptor = getDatasetDescriptor(outputDescriptorConfig);
- result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
- inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
+ try {
+ Config inputDescriptorConfig = config.getConfig(inputPrefix);
+ DatasetDescriptor inputDescriptor = getDatasetDescriptor(inputDescriptorConfig);
+ String outputPrefix = Joiner.on(".")
+ .join(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i));
+ Config outputDescriptorConfig = config.getConfig(outputPrefix);
+ DatasetDescriptor outputDescriptor = getDatasetDescriptor(outputDescriptorConfig);
+
+ if (isResolvable(userConfig, inputDescriptor, outputDescriptor)) {
+ result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
+ }
+ } catch (ReflectiveOperationException e) {
+ //Cannot instantiate I/O dataset descriptor due to missing config; skip and try the next one.
+ }
+ inputPrefix = Joiner.on(".").join(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(++i));
}
return result;
}
private DatasetDescriptor getDatasetDescriptor(Config descriptorConfig)
throws ReflectiveOperationException {
- Class datasetDescriptorClass = Class.forName(descriptorConfig.getString(DATASET_DESCRIPTOR_CLASS_KEY));
- return (DatasetDescriptor) GobblinConstructorUtils
- .invokeLongestConstructor(datasetDescriptorClass, descriptorConfig);
+ Class datasetDescriptorClass = Class.forName(descriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+ return (DatasetDescriptor) GobblinConstructorUtils.invokeLongestConstructor(datasetDescriptorClass, descriptorConfig);
}
@Override
@@ -130,27 +137,53 @@ public class StaticFlowTemplate implements FlowTemplate {
return this.rawConfig;
}
- private void ensureTemplateMaterialized()
- throws IOException {
- try {
- if (!isTemplateMaterialized) {
- this.dag = JobTemplateDagFactory.createDagFromJobTemplates(this.jobTemplates);
- }
- this.isTemplateMaterialized = true;
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
@Override
public List<JobTemplate> getJobTemplates() {
return this.jobTemplates;
}
+ private Config getResolvedFlowConfig(Config userConfig) {
+ return userConfig.withFallback(this.rawConfig);
+ }
+
+ /**
+ * Checks if the {@link FlowTemplate} is resolvable using the provided {@link Config} object. A {@link FlowTemplate}
+ * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable
+ * @param userConfig User supplied Config
+ * @return true if the {@link FlowTemplate} is resolvable
+ */
@Override
- public Dag<JobTemplate> getDag()
- throws IOException {
- ensureTemplateMaterialized();
- return this.dag;
+ public boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
+ throws SpecNotFoundException, JobTemplate.TemplateException {
+ Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
+ Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+ userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
+
+ ConfigResolveOptions resolveOptions = ConfigResolveOptions.defaults().setAllowUnresolved(true);
+
+ for (JobTemplate template: this.jobTemplates) {
+ Config templateConfig = template.getResolvedConfig(userConfig).resolve(resolveOptions);
+ if (!template.getResolvedConfig(userConfig).resolve(resolveOptions).isResolved()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public List<Config> getResolvedJobConfigs(Config userConfig, DatasetDescriptor inputDescriptor,
+ DatasetDescriptor outputDescriptor)
+ throws SpecNotFoundException, JobTemplate.TemplateException {
+ Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
+ Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+ userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
+
+ List<Config> resolvedJobConfigs = new ArrayList<>();
+ for (JobTemplate jobTemplate: getJobTemplates()) {
+ Config resolvedJobConfig = jobTemplate.getResolvedConfig(userConfig).resolve().withValue(
+ ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef(jobTemplate.getUri().toString()));;
+ resolvedJobConfigs.add(resolvedJobConfig);
+ }
+ return resolvedJobConfigs;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
index 5dba91c..59c3c87 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
@@ -19,16 +19,23 @@ package org.apache.gobblin.service.modules.template_catalog;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import com.google.common.base.Charsets;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigResolveOptions;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -41,17 +48,22 @@ import org.apache.gobblin.service.modules.template.FlowTemplate;
import org.apache.gobblin.service.modules.template.HOCONInputStreamFlowTemplate;
import org.apache.gobblin.util.PathUtils;
+
/**
* An implementation of a catalog for {@link FlowTemplate}s. Provides basic API for retrieving a {@link FlowTemplate}
* from the catalog and for retrieving {@link JobTemplate}s that are part of a {@link FlowTemplate}.
* The flow and job configuration files are assumed to have the following path structure:
- * <p> /path/to/template/catalog/flowName/flow.(conf|pull) </p>
- * <p> /path/to/template/catalog/flowName/jobs/job1.(conf|pull) </p>
- * <p> /path/to/template/catalog/flowName/jobs/job2.(conf|pull) </p>
+ * <p> /path/to/template/catalog/flowName/flow.conf </p>
+ * <p> /path/to/template/catalog/flowName/jobs/job1.(job|template) </p>
+ * <p> /path/to/template/catalog/flowName/jobs/job2.(job|template) </p>
*/
@Alpha
public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTemplates {
- public static final String JOB_TEMPLATE_DIR_NAME="jobs";
+ public static final String JOBS_DIR_NAME = "jobs";
+ public static final String FLOW_CONF_FILE_NAME = "flow.conf";
+ public static final List<String> JOB_FILE_EXTENSIONS = Arrays.asList(".job", ".template");
+ public static final String JOB_TEMPLATE_KEY = "gobblin.template.uri";
+
protected static final String FS_SCHEME = "FS";
/**
@@ -59,64 +71,94 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla
* @param sysConfig that must contain the fully qualified path of the flow template catalog
* @throws IOException
*/
- public FSFlowCatalog(Config sysConfig) throws IOException {
- super(sysConfig.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, sysConfig.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)));
+ public FSFlowCatalog(Config sysConfig)
+ throws IOException {
+ super(sysConfig.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ sysConfig.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)));
}
/**
*
- * @param flowUri URI of the flow configuration file
+ * @param flowTemplateDirURI URI of the flow template directory
* @return a {@link FlowTemplate}
* @throws SpecNotFoundException
* @throws JobTemplate.TemplateException
* @throws IOException
*/
- public FlowTemplate getFlowTemplate(URI flowUri) throws SpecNotFoundException, JobTemplate.TemplateException, IOException {
+ public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
+ throws SpecNotFoundException, JobTemplate.TemplateException, IOException, URISyntaxException {
if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) {
throw new RuntimeException("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
}
- if (!flowUri.getScheme().equals(FS_SCHEME)) {
- throw new RuntimeException("Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowUri.getScheme());
+ if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) {
+ throw new RuntimeException(
+ "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme());
}
String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
// path of uri is location of template file relative to the job configuration root directory
- Path templateFullPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowUri.getPath()));
+ Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirURI.getPath()));
+ Path templateFullPath = PathUtils.mergePaths(templateDirPath, new Path(FLOW_CONF_FILE_NAME));
FileSystem fs = FileSystem.get(templateFullPath.toUri(), new Configuration());
try (InputStream is = fs.open(templateFullPath)) {
- return new HOCONInputStreamFlowTemplate(is, flowUri, this);
- } catch (ReflectiveOperationException e) {
- throw new RuntimeException(e);
+ return new HOCONInputStreamFlowTemplate(is, flowTemplateDirURI, this);
}
}
/**
*
- * @param flowTemplateDirUri URI of the flow template directory
+ * @param flowTemplateDirURI URI of the flow template directory
* @return a list of {@link JobTemplate}s for a given flow identified by its {@link URI}.
* @throws IOException
* @throws SpecNotFoundException
* @throws JobTemplate.TemplateException
*/
- public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirUri)
- throws IOException, SpecNotFoundException, JobTemplate.TemplateException {
+ public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
+ throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+
+ PathFilter extensionFilter = file -> {
+ for (String extension : JOB_FILE_EXTENSIONS) {
+ if (file.getName().endsWith(extension)) {
+ return true;
+ }
+ }
+ return false;
+ };
+
if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) {
throw new RuntimeException("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
}
- if (!flowTemplateDirUri.getScheme().equals(FS_SCHEME)) {
- throw new RuntimeException("Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirUri.getScheme());
+ if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) {
+ throw new RuntimeException(
+ "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme());
}
List<JobTemplate> jobTemplates = new ArrayList<>();
String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
- Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirUri));
- Path jobTemplatePath = new Path(templateDirPath, JOB_TEMPLATE_DIR_NAME);
+ Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirURI));
+ Path jobTemplatePath = new Path(templateDirPath, JOBS_DIR_NAME);
FileSystem fs = FileSystem.get(jobTemplatePath.toUri(), new Configuration());
- for (FileStatus fileStatus : fs.listStatus(jobTemplatePath)) {
- try (InputStream is = fs.open(fileStatus.getPath())) {
- jobTemplates.add(new HOCONInputStreamJobTemplate(is, fileStatus.getPath().toUri(), this));
+
+ for (FileStatus fileStatus : fs.listStatus(jobTemplatePath, extensionFilter)) {
+ Config templateConfig = loadHoconFileAtPath(fileStatus.getPath(), true);
+ if (templateConfig.hasPath(JOB_TEMPLATE_KEY)) {
+ URI templateUri = new URI(templateConfig.getString(JOB_TEMPLATE_KEY));
+ //Strip out the initial "/"
+ URI actualResourceUri = new URI(templateUri.getPath().substring(1));
+ Path fullTemplatePath =
+ new Path(FSFlowCatalog.class.getClassLoader().getResource(actualResourceUri.getPath()).toURI());
+ templateConfig = templateConfig.withFallback(loadHoconFileAtPath(fullTemplatePath, true));
}
+ jobTemplates.add(new HOCONInputStreamJobTemplate(templateConfig, fileStatus.getPath().toUri(), this));
}
return jobTemplates;
}
+
+ private Config loadHoconFileAtPath(Path filePath, boolean allowUnresolved)
+ throws IOException {
+ ConfigResolveOptions options = ConfigResolveOptions.defaults().setAllowUnresolved(allowUnresolved);
+ try (InputStream is = fs.open(filePath)) {
+ return ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8)).resolve(options);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
index 41c0c9e..8bafe97 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.template_catalog;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
import org.apache.gobblin.runtime.api.JobTemplate;
@@ -34,16 +35,18 @@ public interface FlowCatalogWithTemplates {
* Get {@link FlowTemplate} with given {@link URI}.
* @throws SpecNotFoundException if a {@link JobTemplate} with given {@link URI} cannot be found.
*/
- FlowTemplate getFlowTemplate(URI uri) throws SpecNotFoundException, IOException, JobTemplate.TemplateException;
+ FlowTemplate getFlowTemplate(URI uri)
+ throws SpecNotFoundException, IOException, JobTemplate.TemplateException, URISyntaxException;
/**
*
- * @param flowUri
+ * @param flowTemplateDirURI URI of the flow template directory.
* @return a list of {@link JobTemplate}s for a given flow identified by its {@link URI}.
* @throws IOException
* @throws SpecNotFoundException
* @throws JobTemplate.TemplateException
*/
- public List<JobTemplate> getJobTemplatesForFlow(URI flowUri) throws IOException, SpecNotFoundException, JobTemplate.TemplateException;
+ public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
+ throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
new file mode 100644
index 0000000..b5451bc
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.dircache.DirCache;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.RepositoryCache;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.util.FS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+
+
+public class GitFlowGraphMonitorTest {
+ private static final Logger logger = LoggerFactory.getLogger(GitFlowGraphMonitor.class);
+ private Repository remoteRepo;
+ private Git gitForPush;
+ private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir";
+ private final File remoteDir = new File(TEST_DIR + "/remote");
+ private final File cloneDir = new File(TEST_DIR + "/clone");
+ private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
+ private static final String NODE_1_FILE = "node1.properties";
+ private final File node1Dir = new File(flowGraphDir, "node1");
+ private final File node1File = new File(node1Dir, NODE_1_FILE);
+ private static final String NODE_2_FILE = "node2.properties";
+ private final File node2Dir = new File(flowGraphDir, "node2");
+ private final File node2File = new File(node2Dir, NODE_2_FILE);
+ private final File edge1Dir = new File(node1Dir, "node2");
+ private final File edge1File = new File(edge1Dir, "edge1.properties");
+
+ private RefSpec masterRefSpec = new RefSpec("master");
+ private FSFlowCatalog flowCatalog;
+ private Config config;
+ private BaseFlowGraph flowGraph;
+ private GitFlowGraphMonitor gitFlowGraphMonitor;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ cleanUpDir(TEST_DIR);
+
+ // Create a bare repository
+ RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED);
+ this.remoteRepo = fileKey.open(false);
+ this.remoteRepo.create(true);
+
+ this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+
+ // push an empty commit as a base for detecting changes
+ this.gitForPush.commit().setMessage("First commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.config = ConfigBuilder.create()
+ .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
+ + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
+ .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph")
+ .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
+ .build();
+
+ // Create a FSFlowCatalog instance
+ URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ Properties properties = new Properties();
+ properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(properties);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+ //Create a FlowGraph instance with defaults
+ this.flowGraph = new BaseFlowGraph();
+
+ this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph);
+ this.gitFlowGraphMonitor.setActive(true);
+ }
+
+ private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, String paramValue)
+ throws IOException, GitAPIException {
+ // push a new node file
+ nodeDir.mkdirs();
+ nodeFile.createNewFile();
+ Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8);
+
+ // add, commit, push node
+ this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call();
+ this.gitForPush.commit().setMessage("Node commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.gitFlowGraphMonitor.processGitConfigChanges();
+
+ //Check if node1 has been added to the FlowGraph
+ DataNode dataNode = this.flowGraph.getNode(nodeId);
+ Assert.assertEquals(dataNode.getId(), nodeId);
+ Assert.assertTrue(dataNode.isActive());
+ Assert.assertEquals(dataNode.getRawConfig().getString("param1"), paramValue);
+ }
+
+ @Test
+ public void testAddNode()
+ throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+ testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1");
+ testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2");
+ }
+
+ @Test (dependsOnMethods = "testAddNode")
+ public void testAddEdge()
+ throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+ // push a new node file
+ this.edge1Dir.mkdirs();
+ this.edge1File.createNewFile();
+
+ Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8);
+
+ // add, commit, push
+ this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
+ this.gitForPush.commit().setMessage("Edge commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.gitFlowGraphMonitor.processGitConfigChanges();
+
+ //Check if edge1 has been added to the FlowGraph
+ Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+ Assert.assertEquals(edgeSet.size(), 1);
+ FlowEdge flowEdge = edgeSet.iterator().next();
+ Assert.assertEquals(flowEdge.getSrc(), "node1");
+ Assert.assertEquals(flowEdge.getDest(), "node2");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
+ }
+
+ @Test (dependsOnMethods = "testAddNode")
+ public void testUpdateEdge()
+ throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+ //Update edge1 file
+ Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n"
+ + "key1=value1\n", edge1File, Charsets.UTF_8);
+
+ // add, commit, push
+ this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
+ this.gitForPush.commit().setMessage("Edge commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.gitFlowGraphMonitor.processGitConfigChanges();
+
+ //Check if new edge1 has been added to the FlowGraph
+ Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+ Assert.assertEquals(edgeSet.size(), 1);
+ FlowEdge flowEdge = edgeSet.iterator().next();
+ Assert.assertEquals(flowEdge.getSrc(), "node1");
+ Assert.assertEquals(flowEdge.getDest(), "node2");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
+ Assert.assertEquals(flowEdge.getConfig().getString("key1"), "value1");
+ }
+
+ @Test (dependsOnMethods = "testUpdateEdge")
+ public void testUpdateNode()
+ throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+ //Update param1 value in node1 and check if updated node is added to the graph
+ testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3");
+ }
+
+
+ @Test (dependsOnMethods = "testUpdateNode")
+ public void testRemoveEdge() throws GitAPIException, IOException {
+ // delete a config file
+ edge1File.delete();
+
+ //Node1 has 1 edge before delete
+ Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+ Assert.assertEquals(edgeSet.size(), 1);
+
+ // delete, commit, push
+ DirCache ac = this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
+ this.edge1Dir.getName(), this.edge1File.getName())).call();
+ RevCommit cc = this.gitForPush.commit().setMessage("Edge remove commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.gitFlowGraphMonitor.processGitConfigChanges();
+
+ //Check if edge1 has been deleted from the graph
+ edgeSet = this.flowGraph.getEdges("node1");
+ Assert.assertTrue(edgeSet.size() == 0);
+ }
+
+ @Test (dependsOnMethods = "testRemoveEdge")
+ public void testRemoveNode() throws GitAPIException, IOException {
+ //delete node file
+ node1File.delete();
+
+ //node1 is present in the graph before delete
+ DataNode node1 = this.flowGraph.getNode("node1");
+ Assert.assertNotNull(node1);
+
+ // delete, commit, push
+ DirCache ac = this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call();
+ RevCommit cc = this.gitForPush.commit().setMessage("Node remove commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.gitFlowGraphMonitor.processGitConfigChanges();
+
+ //Check if node1 has been deleted from the graph
+ node1 = this.flowGraph.getNode("node1");
+ Assert.assertNull(node1);
+ }
+
+
+ private void cleanUpDir(String dir) {
+ File specStoreDir = new File(dir);
+
+ // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
+ for (int i = 0; i < 5; i++) {
+ try {
+ if (specStoreDir.exists()) {
+ FileUtils.deleteDirectory(specStoreDir);
+ }
+ // if delete succeeded then break out of loop
+ break;
+ } catch (IOException e) {
+ logger.warn("Cleanup delete directory failed for directory: " + dir, e);
+ }
+ }
+ }
+
+ private String formNodeFilePath(String groupDir, String fileName) {
+ return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
+ }
+
+ private String formEdgeFilePath(String parentDir, String groupDir, String fileName) {
+ return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ cleanUpDir(TEST_DIR);
+ }
+}
\ No newline at end of file
[4/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow
Compiler for Gobblin-as-a-Service (GaaS).
Posted by hu...@apache.org.
[GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS).
Closes #2393 from sv2000/multiHopCompiler
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/22a951f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/22a951f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/22a951f0
Branch: refs/heads/master
Commit: 22a951f0a4ac0c963e99cd2a15989c62a08c81cf
Parents: 33d4fea
Author: suvasude <su...@linkedin.biz>
Authored: Mon Jul 30 09:57:31 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Jul 30 09:57:31 2018 -0700
----------------------------------------------------------------------
.../template/HOCONInputStreamJobTemplate.java | 2 +-
.../modules/core/GitMonitoringService.java | 2 +-
.../dataset/BaseHdfsDatasetDescriptor.java | 98 -----
.../modules/dataset/DatasetDescriptor.java | 38 +-
.../modules/dataset/EncryptionConfig.java | 90 ++++
.../modules/dataset/FSDatasetDescriptor.java | 138 ++++++
.../service/modules/dataset/FormatConfig.java | 102 +++++
.../modules/dataset/HdfsDatasetDescriptor.java | 40 --
.../service/modules/flow/FlowEdgeContext.java | 46 ++
.../service/modules/flow/FlowGraphPath.java | 90 ++++
.../modules/flow/FlowGraphPathFinder.java | 320 ++++++++++++++
.../modules/flow/MultiHopFlowCompiler.java | 157 +++++++
.../service/modules/flowgraph/BaseDataNode.java | 8 +-
.../service/modules/flowgraph/BaseFlowEdge.java | 20 +-
.../modules/flowgraph/BaseFlowGraph.java | 8 +-
.../gobblin/service/modules/flowgraph/Dag.java | 10 +-
.../service/modules/flowgraph/DataNode.java | 3 +-
.../flowgraph/DatasetDescriptorConfigKeys.java | 18 +
.../modules/flowgraph/FileSystemDataNode.java | 83 ----
.../service/modules/flowgraph/FlowEdge.java | 12 +-
.../service/modules/flowgraph/FlowGraph.java | 7 +
.../flowgraph/FlowGraphConfigurationKeys.java | 8 +-
.../service/modules/flowgraph/HdfsDataNode.java | 59 ---
.../modules/flowgraph/LocalFSDataNode.java | 51 ---
.../flowgraph/datanodes/fs/AdlsDataNode.java | 52 +++
.../datanodes/fs/FileSystemDataNode.java | 87 ++++
.../flowgraph/datanodes/fs/HdfsDataNode.java | 58 +++
.../flowgraph/datanodes/fs/LocalFSDataNode.java | 51 +++
.../service/modules/spec/JobExecutionPlan.java | 117 ++++++
.../spec/JobExecutionPlanDagFactory.java | 114 +++++
.../service/modules/template/FlowTemplate.java | 39 +-
.../template/HOCONInputStreamFlowTemplate.java | 13 +-
.../modules/template/JobTemplateDagFactory.java | 79 ----
.../modules/template/StaticFlowTemplate.java | 143 ++++---
.../modules/template_catalog/FSFlowCatalog.java | 90 ++--
.../FlowCatalogWithTemplates.java | 9 +-
.../modules/core/GitFlowGraphMonitorTest.java | 314 ++++++++++++++
.../modules/flow/FlowGraphPathFinderTest.java | 417 +++++++++++++++++++
.../flowgraph/BaseFlowEdgeFactoryTest.java | 74 ++++
.../modules/flowgraph/BaseFlowGraphTest.java | 13 +-
.../spec/JobExecutionPlanDagFactoryTest.java | 116 ++++++
.../template/JobTemplateDagFactoryTest.java | 92 ----
.../template_catalog/FSFlowCatalogTest.java | 53 ++-
.../src/test/resources/flow/flow.conf | 24 ++
.../datanodes/AdlsDataNode-1.properties | 3 +
.../datanodes/HdfsDataNode-1.properties | 3 +
.../datanodes/HdfsDataNode-2.properties | 3 +
.../datanodes/HdfsDataNode-3.properties | 3 +
.../datanodes/HdfsDataNode-4.properties | 3 +
.../datanodes/LocalFsDataNode-1.properties | 3 +
.../hdfs-1-to-hdfs-1-encrypt.properties | 9 +
.../flowedges/hdfs-1-to-hdfs-3.properties | 10 +
.../hdfs-2-to-hdfs-2-encrypt.properties | 9 +
.../flowedges/hdfs-2-to-hdfs-4.properties | 9 +
.../flowedges/hdfs-3-to-adls-1.properties | 13 +
.../flowedges/hdfs-4-to-adls-1.properties | 13 +
.../flowedges/local-to-hdfs-1.properties | 9 +
.../flowedges/local-to-hdfs-2.properties | 9 +
.../modules/core/GitFlowGraphMonitorTest.java | 314 --------------
.../flowgraph/BaseFlowEdgeFactoryTest.java | 73 ----
.../template_catalog/flowEdgeTemplate/flow.conf | 20 +
.../flowEdgeTemplate/jobs/job1.job | 1 +
.../flowEdgeTemplate/jobs/job2.job | 3 +
.../flowEdgeTemplate/jobs/job3.job | 2 +
.../flowEdgeTemplate/jobs/job4.job | 2 +
.../hdfsConvertToJsonAndEncrypt/flow.conf | 18 +
.../jobs/hdfs-encrypt-avro-to-json.job | 1 +
.../flowEdgeTemplates/hdfsToAdl/flow.conf | 18 +
.../hdfsToAdl/jobs/distcp-hdfs-to-adl.job | 1 +
.../flowEdgeTemplates/hdfsToHdfs/flow.conf | 15 +
.../hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job | 1 +
.../flowEdgeTemplates/localToHdfs/flow.conf | 9 +
.../localToHdfs/jobs/distcp-local-to-hdfs.job | 1 +
.../distcp-push-hdfs-to-adl.template | 65 +++
.../multihop/jobTemplates/distcp.template | 57 +++
.../hdfs-convert-to-json-and-encrypt.template | 42 ++
.../template_catalog/templates/job1.template | 2 +
.../template_catalog/templates/job2.template | 2 +
.../template_catalog/templates/job3.template | 2 +
.../template_catalog/templates/job4.template | 2 +
.../template_catalog/test-template/flow.conf | 30 +-
.../test-template/jobs/job1.conf | 2 -
.../test-template/jobs/job1.job | 1 +
.../test-template/jobs/job2.conf | 3 -
.../test-template/jobs/job2.job | 3 +
.../test-template/jobs/job3.conf | 3 -
.../test-template/jobs/job3.job | 2 +
.../test-template/jobs/job4.conf | 3 -
.../test-template/jobs/job4.job | 2 +
89 files changed, 3082 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
index a1337fd..5e132fe 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java
@@ -51,7 +51,7 @@ public class HOCONInputStreamJobTemplate extends StaticJobTemplate {
this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)), uri, catalog);
}
- private HOCONInputStreamJobTemplate(Config config, URI uri, JobCatalogWithTemplates catalog)
+ public HOCONInputStreamJobTemplate(Config config, URI uri, JobCatalogWithTemplates catalog)
throws SpecNotFoundException, TemplateException {
super(uri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION,
config.hasPath(ConfigurationKeys.JOB_DESCRIPTION_KEY) ? config.getString(ConfigurationKeys.JOB_DESCRIPTION_KEY) : "",
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
index 2361edc..c4d3656 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -107,7 +107,7 @@ public abstract class GitMonitoringService extends AbstractIdleService {
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor")));
}
- synchronized void setActive(boolean isActive) {
+ public synchronized void setActive(boolean isActive) {
if (this.isActive == isActive) {
// No-op if already in correct state
return;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
deleted file mode 100644
index 7d7e2b4..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.dataset;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
-import org.apache.gobblin.util.ConfigUtils;
-
-import lombok.Getter;
-
-
-/**
- * An implementation of {@link HdfsDatasetDescriptor}.
- */
-@Alpha
-public class BaseHdfsDatasetDescriptor implements HdfsDatasetDescriptor {
- @Getter
- private final String path;
- @Getter
- private final String format;
- @Getter
- private final String description;
- @Getter
- private final String platform;
-
- public BaseHdfsDatasetDescriptor(Config config) {
- Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PATH_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.PATH_KEY));
- Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.FORMAT_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.FORMAT_KEY));
-
- this.path = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, null);
- this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, null);
- this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
- this.platform = "hdfs";
- }
-
- /**
- * A {@link HdfsDatasetDescriptor} is compatible with another {@link DatasetDescriptor} iff they have identical
- * platform, type, path, and format.
- * TODO: Currently isCompatibleWith() only checks if HDFS paths described by the two {@link DatasetDescriptor}s
- * being compared are identical. Need to enhance this for the case of where paths can contain glob patterns.
- * e.g. paths described by the pattern /data/input/* are a subset of paths described by /data/* and hence, the
- * two descriptors should be compatible.
- * @return true if this {@link HdfsDatasetDescriptor} is compatibe with another {@link DatasetDescriptor}.
- */
- @Override
- public boolean isCompatibleWith(DatasetDescriptor o) {
- return this.equals(o);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- HdfsDatasetDescriptor other = (HdfsDatasetDescriptor) o;
- if(this.getPlatform() == null || other.getPlatform() == null) {
- return false;
- }
- if(!this.getPlatform().equalsIgnoreCase(other.getPlatform()) || !(o instanceof HdfsDatasetDescriptor)) {
- return false;
- }
-
- return this.getPath().equals(other.getPath()) && this.getFormat().equalsIgnoreCase(other.getFormat());
- }
-
- @Override
- public String toString() {
- return "(" + Joiner.on(",").join(this.getPlatform(),this.getPath(),this.getFormat()) + ")";
- }
-
- @Override
- public int hashCode() {
- return this.toString().hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
index 4a322e6..e8474e3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -17,28 +17,54 @@
package org.apache.gobblin.service.modules.dataset;
+import com.typesafe.config.Config;
+
import org.apache.gobblin.annotation.Alpha;
/**
- * The interface for dataset descriptors.
+ * The interface for dataset descriptors. Each dataset is described in terms of the following attributes:
+ * <ul>
+ * <p> platform (e.g. HDFS, ADLS, JDBC). </p>
+ * <p> path, which describes the fully qualified name of the dataset. </p>
+ * <p> a format descriptor, which encapsulates its representation (e.g. avro, csv), codec (e.g. gzip, deflate), and
+ * encryption config (e.g. aes_rotating, gpg). </p>
+ * </ul>
*/
@Alpha
public interface DatasetDescriptor {
/**
- * @return the dataset platform i.e. the storage backing the dataset (e.g. HDFS, JDBC, Espresso etc.)
+ * @return the dataset platform i.e. the storage system backing the dataset (e.g. HDFS, ADLS, JDBC etc.)
*/
public String getPlatform();
/**
+ * Returns the fully qualified name of a dataset. The fully qualified name is the absolute directory path of a dataset
+ * when the dataset is backed by a FileSystem. In the case of a database table, it is dbName.tableName.
+ * @return dataset path.
+ */
+ public String getPath();
+
+ /**
+ *
+ * @return storage format of the dataset.
+ */
+ public FormatConfig getFormatConfig();
+
+ /**
* @return a human-readable description of the dataset.
*/
public String getDescription();
/**
- * @return true if this {@link DatasetDescriptor} is compatible with the other {@link DatasetDescriptor} i.e. the
- * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other {@link DatasetDescriptor}.
- * This check is non-commutative.
+ * @return true if this {@link DatasetDescriptor} contains the other {@link DatasetDescriptor} i.e. the
+ * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other
+ * {@link DatasetDescriptor}. This operation is non-commutative.
+ */
+ public boolean contains(DatasetDescriptor other);
+
+ /**
+ * @return the raw config.
*/
- public boolean isCompatibleWith(DatasetDescriptor other);
+ public Config getRawConfig();
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
new file mode 100644
index 0000000..21c7c17
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class EncryptionConfig {
+ @Getter
+ private final String encryptionAlgorithm;
+ @Getter
+ private final String keystoreType;
+ @Getter
+ private final String keystoreEncoding;
+
+ public EncryptionConfig(Config encryptionConfig) {
+ this.encryptionAlgorithm = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY,
+ DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.keystoreType = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
+ DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
+ DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ }
+
+ public boolean contains(EncryptionConfig other) {
+ if (other == null) {
+ return false;
+ }
+
+ String otherEncryptionAlgorithm = other.getEncryptionAlgorithm();
+ String otherKeystoreType = other.getKeystoreType();
+ String otherKeystoreEncoding = other.getKeystoreEncoding();
+
+ return (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getEncryptionAlgorithm())
+ || this.encryptionAlgorithm.equalsIgnoreCase(otherEncryptionAlgorithm))
+ && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreType())
+ || this.keystoreType.equalsIgnoreCase(otherKeystoreType))
+ && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreEncoding())
+ || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof EncryptionConfig)) {
+ return false;
+ }
+ EncryptionConfig other = (EncryptionConfig) o;
+ return this.getEncryptionAlgorithm().equalsIgnoreCase(other.getEncryptionAlgorithm()) && this.keystoreEncoding.equalsIgnoreCase(other.getKeystoreEncoding())
+ && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType());
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Joiner.on(",").join(this.encryptionAlgorithm, this.keystoreType, this.keystoreEncoding) + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + encryptionAlgorithm.toLowerCase().hashCode();
+ result = 31 * result + keystoreType.toLowerCase().hashCode();
+ result = 31 * result + keystoreEncoding.toLowerCase().hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
new file mode 100644
index 0000000..a5cb717
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+
+
+/**
+ * An implementation of {@link DatasetDescriptor} with FS-based storage.
+ */
+@Alpha
+public class FSDatasetDescriptor implements DatasetDescriptor {
+ @Getter
+ private final String platform;
+ @Getter
+ private final String path;
+ @Getter
+ private final FormatConfig formatConfig;
+ @Getter
+ private final String description;
+ @Getter
+ private final Config rawConfig;
+
+ public FSDatasetDescriptor(Config config) {
+ Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY), "Dataset descriptor config must specify platform");
+ this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY);
+ this.path = PathUtils.getPathWithoutSchemeAndAuthority(new Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
+ DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
+ this.formatConfig = new FormatConfig(config);
+ this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
+ this.rawConfig = config;
+ }
+
+ /**
+ * A helper to determine if the path description of this {@link DatasetDescriptor} is a superset of paths
+ * accepted by the other {@link DatasetDescriptor}. If the path description of the other {@link DatasetDescriptor}
+ * is a glob pattern, we return false.
+ *
+ * @param otherPath a glob pattern that describes a set of paths.
+ * @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}.
+ */
+ public boolean isPathContaining(String otherPath) {
+ if (otherPath == null) {
+ return false;
+ }
+ if (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getPath())) {
+ return true;
+ }
+ if (PathUtils.isGlob(new Path(otherPath))) {
+ return false;
+ }
+ GlobPattern globPattern = new GlobPattern(this.getPath());
+ return globPattern.matches(otherPath);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean contains(DatasetDescriptor o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FSDatasetDescriptor)) {
+ return false;
+ }
+ FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+
+ if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
+ return false;
+ }
+
+ return getFormatConfig().contains(other.getFormatConfig()) && isPathContaining(other.getPath());
+ }
+
+ /**
+ *
+ * @param o
+ * @return true iff "this" dataset descriptor is compatible with the "other" and the "other" dataset descriptor is
+ * compatible with this dataset descriptor.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FSDatasetDescriptor)) {
+ return false;
+ }
+ FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+ if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
+ return false;
+ }
+ return this.getPath().equals(other.getPath()) && this.getFormatConfig().equals(other.getFormatConfig());
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(), this.getFormatConfig().toString()) + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + platform.toLowerCase().hashCode();
+ result = 31 * result + path.hashCode();
+ result = 31 * result + getFormatConfig().hashCode();
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
new file mode 100644
index 0000000..a36182c
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A location-independent descriptor of a dataset, which describes a dataset in terms of its physical attributes.
+ * The physical attributes include:
+ * <ul>
+ * <p> Data format (e.g. Avro, CSV, JSON). </p>
+ * <p> Data encoding type (e.g. Gzip, Bzip2, Base64, Deflate). </p>
+ * <p> Encryption properties (e.g. aes_rotating, gpg). </p>
+ * </ul>
+ */
+@Alpha
+public class FormatConfig {
+ @Getter
+ private final String format;
+ @Getter
+ private final String codecType;
+ @Getter
+ private final EncryptionConfig encryptionConfig;
+
+ public FormatConfig(Config config) {
+ this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.codecType = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.CODEC_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config, DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory
+ .empty()));
+ }
+
+ public boolean contains(FormatConfig other) {
+ return containsFormat(other.getFormat()) && containsCodec(other.getCodecType())
+ && containsEncryptionConfig(other.getEncryptionConfig());
+ }
+
+ private boolean containsFormat(String otherFormat) {
+ return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
+ || (this.getFormat().equalsIgnoreCase(otherFormat));
+ }
+
+ private boolean containsCodec(String otherCodecType) {
+ return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
+ || (this.getCodecType().equalsIgnoreCase(otherCodecType));
+ }
+
+ private boolean containsEncryptionConfig(EncryptionConfig otherEncryptionConfig) {
+ return this.getEncryptionConfig().contains(otherEncryptionConfig);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof FormatConfig)) {
+ return false;
+ }
+ FormatConfig other = (FormatConfig) o;
+ return this.getFormat().equalsIgnoreCase(other.getFormat()) && this.getCodecType().equalsIgnoreCase(other.getCodecType())
+ && this.getEncryptionConfig().equals(other.getEncryptionConfig());
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Joiner.on(",").join(this.getFormat(), this.getCodecType(), this.getEncryptionConfig().toString()) + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + codecType.toLowerCase().hashCode();
+ result = 31 * result + format.toLowerCase().hashCode();
+ result = 31 * result + encryptionConfig.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
deleted file mode 100644
index 6f1970c..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.dataset;
-
-import org.apache.gobblin.annotation.Alpha;
-
-
-/**
- * A descriptor interface for HDFS datasets
- */
-@Alpha
-public interface HdfsDatasetDescriptor extends DatasetDescriptor {
- /**
- *
- * @return dataset path.
- */
- public String getPath();
-
- /**
- *
- * @return storage format of the dataset.
- */
- public String getFormat();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
new file mode 100644
index 0000000..daff8ce
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flow;
+
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+
+
+/**
+ * A helper class used to maintain additional context associated with each {@link FlowEdge} during path
+ * computation while the edge is explored for its eligibility. The additional context includes the input
+ * {@link DatasetDescriptor} of this edge which is compatible with the previous {@link FlowEdge}'s output
+ * {@link DatasetDescriptor} (where "previous" means the immediately preceding {@link FlowEdge} visited before
+ * the current {@link FlowEdge}), and the corresponding output dataset descriptor of the current {@link FlowEdge}.
+ */
+@AllArgsConstructor
+@EqualsAndHashCode(exclude = {"mergedConfig", "specExecutor"})
+@Getter
+public class FlowEdgeContext {
+ private FlowEdge edge;
+ private DatasetDescriptor inputDatasetDescriptor;
+ private DatasetDescriptor outputDatasetDescriptor;
+ private Config mergedConfig;
+ private SpecExecutor specExecutor;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
new file mode 100644
index 0000000..c642708
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+
+
+/**
+ * A class that returns a {@link Dag} of {@link JobExecutionPlan}s from a sequence of edges
+ * represented as a {@link List} of {@link FlowEdgeContext}s.
+ */
+public class FlowGraphPath {
+ private List<FlowEdgeContext> path;
+ private FlowSpec flowSpec;
+ private Long flowExecutionId;
+
+ public FlowGraphPath(List<FlowEdgeContext> path, FlowSpec flowSpec, Long flowExecutionId) {
+ this.path = path;
+ this.flowSpec = flowSpec;
+ this.flowExecutionId = flowExecutionId;
+ }
+
+ public Dag<JobExecutionPlan> asDag()
+ throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+ Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>());
+ Iterator<FlowEdgeContext> pathIterator = path.iterator();
+ while (pathIterator.hasNext()) {
+ Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next());
+ flowDag = flowDag.concatenate(flowEdgeDag);
+ }
+ return flowDag;
+ }
+
+ /**
+ * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < JobExecutionPlan >} that moves data
+ * from the source of the {@link FlowEdge} to the destination of the {@link FlowEdge}.
+ * @param flowEdgeContext an instance of {@link FlowEdgeContext}.
+ * @return a {@link Dag} of {@link JobExecutionPlan}s associated with the {@link FlowEdge}.
+ */
+ private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext)
+ throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+ FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate();
+ DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor();
+ DatasetDescriptor outputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+ Config mergedConfig = flowEdgeContext.getMergedConfig();
+ SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor();
+
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+ //Get resolved job configs from the flow template
+ List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor);
+ //Iterate over each resolved job config and convert the config to a JobSpec.
+ for (Config resolvedJobConfig : resolvedJobConfigs) {
+ jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId));
+ }
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
new file mode 100644
index 0000000..2b4746c
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Alpha
+@Slf4j
+public class FlowGraphPathFinder {
+ private static final String SOURCE_PREFIX = "source";
+ private static final String DESTINATION_PREFIX = "destination";
+
+ private FlowGraph flowGraph;
+ private FlowSpec flowSpec;
+ private Config flowConfig;
+
+ private DataNode srcNode;
+ private DataNode destNode;
+
+ private DatasetDescriptor srcDatasetDescriptor;
+ private DatasetDescriptor destDatasetDescriptor;
+
+ //Maintain path of FlowEdges as parent-child map
+ private Map<FlowEdgeContext, FlowEdgeContext> pathMap;
+
+ //Flow Execution Id
+ private Long flowExecutionId;
+
+ /**
+ * Constructor.
+ * @param flowGraph
+ */
+ public FlowGraphPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) {
+ this.flowGraph = flowGraph;
+ this.flowSpec = flowSpec;
+ this.flowConfig = flowSpec.getConfig();
+
+ //Get src/dest DataNodes from the flow config
+ String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
+ String destNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+ this.srcNode = this.flowGraph.getNode(srcNodeId);
+ Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
+ this.destNode = this.flowGraph.getNode(destNodeId);
+ Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId);
+
+ //Get src/dest dataset descriptors from the flow config
+ Config srcDatasetDescriptorConfig =
+ flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
+ Config destDatasetDescriptorConfig =
+ flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+
+ try {
+ Class srcdatasetDescriptorClass =
+ Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+ this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+ .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig);
+ Class destDatasetDescriptorClass =
+ Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+ this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
+ .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s
+ * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are
+ * added first to the queue. This ensures that dataset transformations are always performed closest to the source.
+ * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode.
+ */
+ public FlowGraphPath findPath() throws PathFinderException {
+ try {
+ //Initialization of auxiliary data structures used for path computation
+ this.pathMap = new HashMap<>();
+
+ // Generate flow execution id for this compilation
+ this.flowExecutionId = System.currentTimeMillis();
+
+ //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
+ // flow graph.
+ // TODO: we can easily improve the performance by using a ReentrantReadWriteLock associated with the FlowGraph. This will
+ // allow multiple concurrent readers to not be blocked on each other, as long as there are no writers.
+ synchronized (this.flowGraph) {
+ //Base condition 1: Source Node or Dest Node is inactive; return null
+ if (!srcNode.isActive() || !destNode.isActive()) {
+ log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", this.srcNode.getId(),
+ this.destNode.getId());
+ return null;
+ }
+
+ //Base condition 2: Check if we are already at the target. If so, return an empty path.
+ if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
+ return new FlowGraphPath(new ArrayList<>(), flowSpec, flowExecutionId);
+ }
+
+ LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
+ edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
+ for (FlowEdgeContext flowEdgeContext : edgeQueue) {
+ this.pathMap.put(flowEdgeContext, flowEdgeContext);
+ }
+
+ //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges
+ // to the edge E. For each adjacent edge E', do the following:
+ // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and
+ // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
+ // edge E'. If yes, add the edge E' to the edge queue.
+ // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
+ while (!edgeQueue.isEmpty()) {
+ FlowEdgeContext flowEdgeContext = edgeQueue.pop();
+
+ DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
+ DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+
+ //Are we done?
+ if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
+ return constructPath(flowEdgeContext);
+ }
+
+ //Expand the currentNode to its adjacent edges and add them to the queue.
+ List<FlowEdgeContext> nextEdges =
+ getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
+ for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
+ //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
+ // queue.
+ if (!this.pathMap.containsKey(childFlowEdgeContext)) {
+ edgeQueue.add(childFlowEdgeContext);
+ this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
+ }
+ }
+ }
+ }
+ //No path found. Return null.
+ return null;
+ } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) {
+ throw new PathFinderException(
+ "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + this.destNode.getId(), e);
+ }
+ }
+
+ private boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor,
+ DatasetDescriptor destDatasetDescriptor) {
+ if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an
+ * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor.
+ * @param dataNode
+ * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge.
+ * @param destDatasetDescriptor Target {@link DatasetDescriptor}.
+ * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion.
+ */
+ private List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
+ DatasetDescriptor destDatasetDescriptor) {
+ List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
+ for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
+ try {
+ DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
+ //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive.
+ if (!edgeDestination.isActive() || !flowEdge.isActive()) {
+ continue;
+ }
+
+ boolean foundExecutor = false;
+ //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
+ for (SpecExecutor specExecutor: flowEdge.getExecutors()) {
+ Config mergedConfig = getMergedConfig(flowEdge, specExecutor);
+ List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
+ flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
+ for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
+ DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
+ DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
+ if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+ FlowEdgeContext flowEdgeContext;
+ if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+ //If datasets described by the currentDatasetDescriptor is a subset of the datasets described
+ // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g.
+ // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward.
+ flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, specExecutor);
+ } else {
+ //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge)
+ flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, specExecutor);
+ }
+ if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) {
+ //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
+ // with those of destination dataset descriptor.
+ // In other words, we prioritize edges that perform data transformations as close to the source as possible.
+ prioritizedEdgeList.add(0, flowEdgeContext);
+ } else {
+ prioritizedEdgeList.add(flowEdgeContext);
+ }
+ foundExecutor = true;
+ }
+ }
+ // Found a SpecExecutor. Proceed to the next FlowEdge.
+ // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
+ if (foundExecutor) {
+ break;
+ }
+ }
+ } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException
+ | JobTemplate.TemplateException e) {
+ //Skip the edge; and continue
+ log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
+ }
+ }
+ return prioritizedEdgeList;
+ }
+
+ /**
+ * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below):
+ * <ul>
+ * <p> the user provided flow config </p>
+ * <p> edge specific properties/overrides </p>
+ * <p> spec executor config/overrides </p>
+ * <p> source node config </p>
+ * <p> destination node config </p>
+ * </ul>
+ * Each {@link JobTemplate}'s config will eventually be resolved against this merged config.
+ * @param flowEdge An instance of {@link FlowEdge}.
+ * @param specExecutor A {@link SpecExecutor}.
+ * @return the merged config derived as described above.
+ */
+ private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor)
+ throws ExecutionException, InterruptedException {
+ Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
+ Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
+ Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig())
+ .withFallback(srcNodeConfig).withFallback(destNodeConfig);
+ return mergedConfig;
+ }
+
+ /**
+ *
+ * @param flowEdgeContext of the last {@link FlowEdge} in the path.
+ * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}.
+ * @throws IOException
+ * @throws SpecNotFoundException
+ * @throws JobTemplate.TemplateException
+ * @throws URISyntaxException
+ */
+ private FlowGraphPath constructPath(FlowEdgeContext flowEdgeContext)
+ throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+ //Backtrace from the last edge using the path map and push each edge into a LIFO data structure.
+ List<FlowEdgeContext> path = new LinkedList<>();
+ path.add(flowEdgeContext);
+ FlowEdgeContext currentFlowEdgeContext = flowEdgeContext;
+ while (true) {
+ path.add(0, this.pathMap.get(currentFlowEdgeContext));
+ currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext);
+ //Are we at the first edge in the path?
+ if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) {
+ break;
+ }
+ }
+ FlowGraphPath flowGraphPath = new FlowGraphPath(path, flowSpec, flowExecutionId);
+ return flowGraphPath;
+ }
+
+ public static class PathFinderException extends Exception {
+ public PathFinderException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PathFinderException(String message) {
+ super(message);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
new file mode 100644
index 0000000..8b14b10
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+
+
+/***
+ * Take in a logical {@link Spec} ie flow and compile corresponding materialized job {@link Spec}
+ * and its mapping to {@link SpecExecutor}.
+ */
+@Alpha
+@Slf4j
+public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
+ @Getter
+ private FlowGraph flowGraph;
+ private GitFlowGraphMonitor gitFlowGraphMonitor;
+ @Getter
+ private boolean active;
+
+ public MultiHopFlowCompiler(Config config) {
+ this(config, true);
+ }
+
+ public MultiHopFlowCompiler(Config config, boolean instrumentationEnabled) {
+ this(config, Optional.<Logger>absent(), instrumentationEnabled);
+ }
+
+ public MultiHopFlowCompiler(Config config, Optional<Logger> log) {
+ this(config, log, true);
+ }
+
+ public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
+ super(config, log, instrumentationEnabled);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ FSFlowCatalog flowCatalog;
+ try {
+ flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
+ }
+ this.flowGraph = new BaseFlowGraph();
+ this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph);
+ }
+
+ public void setActive(boolean active) {
+ this.active = active;
+ this.gitFlowGraphMonitor.setActive(active);
+ }
+
+ /**
+ * TODO: We need to change signature of compileFlow to return a Dag instead of a HashMap to capture
+ * job dependencies.
+ * @param spec
+ * @return
+ */
+ @Override
+ public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+ Preconditions.checkNotNull(spec);
+ Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowToJobSpecCompiler only accepts FlowSpecs");
+
+ long startTime = System.nanoTime();
+ Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap();
+
+ FlowSpec flowSpec = (FlowSpec) spec;
+ String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
+ String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+ log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
+
+ FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec);
+ try {
+ //Compute the path from source to destination.
+ FlowGraphPath flowGraphPath = pathFinder.findPath();
+
+ //Convert the path into a Dag of JobExecutionPlans.
+ Dag<JobExecutionPlan> jobExecutionPlanDag;
+ if (flowGraphPath != null) {
+ jobExecutionPlanDag = flowGraphPath.asDag();
+ } else {
+ Instrumented.markMeter(this.flowCompilationFailedMeter);
+ log.info(String.format("No path found from source: %s and destination: %s", source, destination));
+ return null;
+ }
+
+ //TODO: Just a dummy return value for now. compileFlow() signature needs to be modified to return a Dag instead
+ // of a Map. For now just add all specs into the map.
+ for (Dag.DagNode<JobExecutionPlan> node: jobExecutionPlanDag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan = node.getValue();
+ specExecutorMap.put(jobExecutionPlan.getJobSpec(), jobExecutionPlan.getSpecExecutor());
+ }
+ } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | IOException
+ | URISyntaxException e) {
+ Instrumented.markMeter(this.flowCompilationFailedMeter);
+ log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e);
+ return null;
+ }
+ Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
+ Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+ return specExecutorMap;
+ }
+
+ @Override
+ protected void populateEdgeTemplateMap() {
+ log.warn("No population of templates based on edge happen in this implementation");
+ return;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
index 731bc22..4fb9711 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
@@ -19,9 +19,9 @@ package org.apache.gobblin.service.modules.flowgraph;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
import org.apache.gobblin.util.ConfigUtils;
import joptsimple.internal.Strings;
@@ -38,7 +38,7 @@ public class BaseDataNode implements DataNode {
@Getter
private String id;
@Getter
- private Config props;
+ private Config rawConfig;
@Getter
private boolean active = true;
@@ -50,8 +50,8 @@ public class BaseDataNode implements DataNode {
if (nodeProps.hasPath(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY)) {
this.active = nodeProps.getBoolean(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY);
}
- this.props = nodeProps;
- } catch(Exception e) {
+ this.rawConfig = nodeProps;
+ } catch (Exception e) {
throw new DataNodeCreationException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index fc82cc1..56f6c1b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -44,7 +44,10 @@ import org.apache.gobblin.util.ConfigUtils;
@Alpha
public class BaseFlowEdge implements FlowEdge {
@Getter
- protected List<String> endPoints;
+ protected String src;
+
+ @Getter
+ protected String dest;
@Getter
protected FlowTemplate flowTemplate;
@@ -53,7 +56,7 @@ public class BaseFlowEdge implements FlowEdge {
private List<SpecExecutor> executors;
@Getter
- private Config props;
+ private Config config;
@Getter
private String id;
@@ -63,11 +66,12 @@ public class BaseFlowEdge implements FlowEdge {
//Constructor
public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
- this.endPoints = endPoints;
+ this.src = endPoints.get(0);
+ this.dest = endPoints.get(1);
this.flowTemplate = flowTemplate;
this.executors = executors;
this.active = active;
- this.props = properties;
+ this.config = properties;
this.id = edgeId;
}
@@ -91,7 +95,7 @@ public class BaseFlowEdge implements FlowEdge {
FlowEdge that = (FlowEdge) o;
- if (!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) {
+ if (!(this.getSrc().equals(that.getSrc())) && ((this.getDest()).equals(that.getDest()))) {
return false;
}
@@ -140,14 +144,14 @@ public class BaseFlowEdge implements FlowEdge {
specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i));
}
- String flowTemplateUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "");
+ String flowTemplateDirUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "");
//Perform basic validation
Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points");
Preconditions
.checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor");
Preconditions
- .checkArgument(!Strings.isNullOrEmpty(flowTemplateUri), "FlowTemplate URI must be not null or empty");
+ .checkArgument(!Strings.isNullOrEmpty(flowTemplateDirUri), "FlowTemplate URI must be not null or empty");
boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
//Build SpecExecutor from config
@@ -158,7 +162,7 @@ public class BaseFlowEdge implements FlowEdge {
SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig);
specExecutors.add(executor);
}
- FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri));
+ FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateDirUri));
return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive);
} catch (RuntimeException e) {
throw e;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index 783f7ea..edf40cc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -75,8 +75,8 @@ public class BaseFlowGraph implements FlowGraph {
*/
@Override
public synchronized boolean addFlowEdge(FlowEdge edge) {
- String srcNode = edge.getEndPoints().get(0);
- String dstNode = edge.getEndPoints().get(1);
+ String srcNode = edge.getSrc();
+ String dstNode = edge.getDest();
if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) {
return false;
}
@@ -153,10 +153,10 @@ public class BaseFlowGraph implements FlowGraph {
* if the {@link FlowEdge} is not in the graph, return false.
*/
public synchronized boolean deleteFlowEdge(FlowEdge edge) {
- if(!dataNodeMap.containsKey(edge.getEndPoints().get(0))) {
+ if(!dataNodeMap.containsKey(edge.getSrc())) {
return false;
}
- DataNode node = dataNodeMap.get(edge.getEndPoints().get(0));
+ DataNode node = dataNodeMap.get(edge.getSrc());
if(!nodesToEdges.get(node).contains(edge)) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 8ae0027..58bbb81 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -87,6 +87,10 @@ public class Dag<T> {
return node.parentNodes;
}
+ public boolean isEmpty() {
+ return this.nodes.isEmpty();
+ }
+
/**
* Concatenate two dags together. Join the "other" dag to "this" dag and return "this" dag.
* The concatenate method ensures that all the jobs of "this" dag (which may have multiple end nodes)
@@ -97,9 +101,12 @@ public class Dag<T> {
* @return the concatenated dag
*/
public Dag<T> concatenate(Dag<T> other) throws IOException {
- if (other == null) {
+ if (other == null || other.isEmpty()) {
return this;
}
+ if (this.isEmpty()) {
+ return other;
+ }
for (DagNode node : this.endNodes) {
this.parentChildMap.put(node, Lists.newArrayList());
for (DagNode otherNode : other.startNodes) {
@@ -108,6 +115,7 @@ public class Dag<T> {
}
this.endNodes = other.endNodes;
}
+ this.nodes.addAll(other.nodes);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
index 4931685..b7a5274 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.flowgraph;
import com.typesafe.config.Config;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
/**
@@ -37,7 +36,7 @@ public interface DataNode {
* @return the attributes of a {@link DataNode}. It also includes properties for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}
* e.g. "source.fs.uri" for an HDFS node, "jdbc.publisher.url" for JDBC node.
*/
- Config getProps();
+ Config getRawConfig();
/**
* @return true if the {@link DataNode} is active
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index e98337d..23e20c8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -22,7 +22,25 @@ package org.apache.gobblin.service.modules.flowgraph;
*/
public class DatasetDescriptorConfigKeys {
//Gobblin Service Dataset Descriptor related keys
+ public static final String FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.input.dataset.descriptor";
+ public static final String FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.output.dataset.descriptor";
+
+ public static final String FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.edge.input.dataset.descriptor";
+ public static final String FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.edge.output.dataset.descriptor";
+
+ public static final String CLASS_KEY = "class";
+ public static final String PLATFORM_KEY = "platform";
public static final String PATH_KEY = "path";
public static final String FORMAT_KEY = "format";
+ public static final String CODEC_KEY = "codec";
public static final String DESCRIPTION_KEY = "description";
+
+ //Dataset encryption related keys
+ public static final String ENCYPTION_PREFIX = "encrypt";
+ public static final String ENCRYPTION_ALGORITHM_KEY = "algorithm";
+ public static final String ENCRYPTION_KEYSTORE_TYPE_KEY = "keystore_type";
+ public static final String ENCRYPTION_KEYSTORE_ENCODING_KEY = "keystore_encoding";
+
+ public static final String DATASET_DESCRIPTOR_CONFIG_ANY = "any";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
deleted file mode 100644
index 5899645..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.io.IOException;
-import java.net.URI;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.util.ConfigUtils;
-
-import joptsimple.internal.Strings;
-import lombok.Getter;
-
-
-/**
- * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
- * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
- */
-@Alpha
-public abstract class FileSystemDataNode extends BaseDataNode {
- public static final String FS_URI_KEY = "fs.uri";
- @Getter
- private String fsUri;
-
- /**
- * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id.
- */
- public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException {
- super(nodeProps);
- try {
- this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, "");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "FS URI cannot be null or empty for an HDFSDataNode");
- URI uri = new URI(this.fsUri);
- if(!isUriValid(uri)) {
- throw new IOException("Invalid FS URI " + this.fsUri);
- }
- } catch(Exception e) {
- throw new DataNodeCreationException(e);
- }
- }
-
- public abstract boolean isUriValid(URI fsUri);
- /**
- * Two HDFS DataNodes are the same if they have the same id and the same fsUri.
- */
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FileSystemDataNode that = (FileSystemDataNode) o;
-
- return this.getId().equals(that.getId()) && fsUri.equals(that.getFsUri());
- }
-
- @Override
- public int hashCode() {
- return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
index fb60d67..497bd5b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java
@@ -41,9 +41,15 @@ import org.apache.gobblin.service.modules.template.FlowTemplate;
public interface FlowEdge {
/**
*
- * @return the {@link DataNode} ids that are the end points of the edge.
+ * @return the source {@link DataNode} id of the edge.
*/
- List<String> getEndPoints();
+ String getSrc();
+
+ /**
+ *
+ * @return the destination {@link DataNode} id of the edge.
+ */
+ String getDest();
/**
*
@@ -62,7 +68,7 @@ public interface FlowEdge {
* is instantiated. It also includes properties needed for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}.
* @return the properties of this edge as a {@link Config} object.
*/
- Config getProps();
+ Config getConfig();
/**
* A string uniquely identifying the edge.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
index 23f5793..b4aa7bf 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java
@@ -32,6 +32,13 @@ import org.apache.gobblin.annotation.Alpha;
public interface FlowGraph {
/**
+ * Get a {@link DataNode} from the node identifier
+ * @param nodeId {@link DataNode} identifier.
+ * @return the {@link DataNode} object if the node is present in the {@link FlowGraph}.
+ */
+ public DataNode getNode(String nodeId);
+
+ /**
* Add a {@link DataNode} to the {@link FlowGraph}
* @param node {@link DataNode} to be added
* @return true if {@link DataNode} is added to the {@link FlowGraph} successfully.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
index cd4876a..8a49ec0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
@@ -22,7 +22,7 @@ public class FlowGraphConfigurationKeys {
public static final String FLOW_EDGE_PREFIX = "flow.edge.";
/**
- * {@link DataNode} configuration keys.
+ * {@link DataNode} related configuration keys.
*/
public static final String DATA_NODE_CLASS = DATA_NODE_PREFIX + "class";
public static final String DEFAULT_DATA_NODE_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseDataNode";
@@ -30,7 +30,7 @@ public class FlowGraphConfigurationKeys {
public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive";
/**
- * {@link FlowEdge} configuration keys.
+ * {@link FlowEdge} related configuration keys.
*/
public static final String FLOW_EDGE_FACTORY_CLASS = FLOW_EDGE_PREFIX + "factory.class";
public static final String DEFAULT_FLOW_EDGE_FACTORY_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseFlowEdge$Factory";
@@ -39,7 +39,7 @@ public class FlowGraphConfigurationKeys {
public static final String FLOW_EDGE_ID_KEY = FLOW_EDGE_PREFIX + "id";
public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name";
public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive";
- public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri";
+ public static final String FLOW_EDGE_TEMPLATE_DIR_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateDirUri";
public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors";
- public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass";
+ public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecInstance.class";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
deleted file mode 100644
index 7bcc18d..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-
-import joptsimple.internal.Strings;
-
-
-/**
- * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri)
- * are validated here.
- */
-@Alpha
-public class HdfsDataNode extends FileSystemDataNode {
- public static final String HDFS_SCHEME = "hdfs";
-
- public HdfsDataNode(Config nodeProps) throws DataNodeCreationException {
- super(nodeProps);
- }
-
- /**
- *
- * @param fsUri FileSystem URI
- * @return true if the scheme is "hdfs" and authority is not empty.
- */
- @Override
- public boolean isUriValid(URI fsUri) {
- String scheme = fsUri.getScheme();
- //Check that the scheme is "hdfs"
- if(!scheme.equals(HDFS_SCHEME)) {
- return false;
- }
- //Ensure that the authority is not empty
- if(Strings.isNullOrEmpty(fsUri.getAuthority())) {
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
deleted file mode 100644
index 6dc1aa3..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-
-import org.apache.gobblin.annotation.Alpha;
-
-import com.typesafe.config.Config;
-
-/**
- * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri)
- * are validated here.
- */
-@Alpha
-public class LocalFSDataNode extends FileSystemDataNode {
- public static final String LOCAL_FS_SCHEME = "file";
-
- public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException {
- super(nodeProps);
- }
-
- /**
- *
- * @param fsUri FileSystem URI
- * @return true if the scheme of fsUri equals "file"
- */
- @Override
- public boolean isUriValid(URI fsUri) {
- String scheme = fsUri.getScheme();
- if(scheme.equals(LOCAL_FS_SCHEME)) {
- return true;
- }
- return false;
- }
-}
[2/4] incubator-gobblin git commit: [GOBBLIN-528] Multihop Flow
Compiler for Gobblin-as-a-Service (GaaS).
Posted by hu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
new file mode 100644
index 0000000..5d0500c
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigSyntax;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Slf4j
+public class FlowGraphPathFinderTest {
+ private FlowGraph flowGraph;
+ private FlowGraphPathFinder pathFinder;
+
+ @BeforeClass
+ public void setUp()
+ throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException {
+ //Create a FlowGraph
+ this.flowGraph = new BaseFlowGraph();
+
+ //Add DataNodes to the graph from the node properties files
+ URI dataNodesUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI();
+ FileSystem fs = FileSystem.get(dataNodesUri, new Configuration());
+ Path dataNodesPath = new Path(dataNodesUri);
+ ConfigParseOptions options = ConfigParseOptions.defaults()
+ .setSyntax(ConfigSyntax.PROPERTIES)
+ .setAllowMissing(false);
+
+ for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) {
+ try (InputStream is = fs.open(fileStatus.getPath())) {
+ Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+ Class dataNodeClass = Class.forName(ConfigUtils
+ .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+ DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
+ this.flowGraph.addDataNode(dataNode);
+ }
+ }
+
+ //Create a FSFlowCatalog instance
+ URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ // Create a FSFlowCatalog instance
+ Properties properties = new Properties();
+ properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(properties);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+
+ //Add FlowEdges from the edge properties files
+ URI flowEdgesURI = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI();
+ fs = FileSystem.get(flowEdgesURI, new Configuration());
+ Path flowEdgesPath = new Path(flowEdgesURI);
+ for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) {
+ log.warn(fileStatus.getPath().toString());
+ try (InputStream is = fs.open(fileStatus.getPath())) {
+ Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options);
+ Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+ FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
+ FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
+ FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog);
+ this.flowGraph.addFlowEdge(edge);
+ }
+ }
+
+ //Create a flow spec
+ Properties flowProperties = new Properties();
+ flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
+ flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup");
+ flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName");
+ flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "LocalFS-1");
+ flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "ADLS-1");
+ Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties);
+
+ //Get the input/output dataset config from a file
+ URI flowConfigUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flow/flow.conf").toURI();
+ Path flowConfigPath = new Path(flowConfigUri);
+ FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration());
+ try (InputStream is = fs1.open(flowConfigPath)) {
+ Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset()));
+ flowConfig = flowConfig.withFallback(datasetConfig).resolve();
+ }
+
+ FlowSpec.Builder flowSpecBuilder = null;
+ flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri())
+ .withConfig(flowConfig)
+ .withDescription("dummy description")
+ .withVersion(FlowSpec.Builder.DEFAULT_VERSION);
+
+ FlowSpec spec = flowSpecBuilder.build();
+ this.pathFinder = new FlowGraphPathFinder(this.flowGraph, spec);
+ }
+
+ @Test
+ public void testFindPath()
+ throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+ SpecNotFoundException, IOException {
+ Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
+ Assert.assertEquals(jobDag.getNodes().size(), 4);
+ Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+ Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+ //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1"
+ Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+ JobExecutionPlan jobSpecWithExecutor = startNode.getValue();
+ JobSpec jobSpec = jobSpecWithExecutor.getJobSpec();
+
+ //Ensure the resolved job config for the first hop has the correct substitutions.
+ Config jobConfig = jobSpec.getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ String from = jobConfig.getString("from");
+ String to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+ String sourceFsUri = jobConfig.getString("fs.uri");
+ Assert.assertEquals(sourceFsUri, "file:///");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+ Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+ String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+ Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+ Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ Assert.assertEquals(jobConfig.getString("type"), "java");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+ //Ensure the spec executor has the correct configurations
+ SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+ //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt"
+ Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+ jobSpecWithExecutor = secondHopNode.getValue();
+ jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3"
+ Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+ jobSpecWithExecutor = thirdHopNode.getValue();
+ jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Get the 4th hop - "Distcp from HDFS3 to ADLS-1"
+ Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+ jobSpecWithExecutor = fourthHopNode.getValue();
+ jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+ Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+ Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobSpecWithExecutor.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Ensure the fourth hop is the last
+ Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+ }
+
+ @Test (dependsOnMethods = "testFindPath")
+ public void testFindPathAfterFirstEdgeDeletion()
+ throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+ SpecNotFoundException, IOException {
+ //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
+ this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
+
+ Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag();
+ Assert.assertEquals(jobDag.getNodes().size(), 4);
+ Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+ Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+
+ //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2"
+ Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+ JobExecutionPlan jobExecutionPlan = startNode.getValue();
+ JobSpec jobSpec = jobExecutionPlan.getJobSpec();
+
+ //Ensure the resolved job config for the first hop has the correct substitutions.
+ Config jobConfig = jobSpec.getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ String from = jobConfig.getString("from");
+ String to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/out/testTeam/testDataset");
+ String sourceFsUri = jobConfig.getString("fs.uri");
+ Assert.assertEquals(sourceFsUri, "file:///");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri);
+ Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri);
+ String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
+ Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
+ Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ Assert.assertEquals(jobConfig.getString("type"), "java");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL");
+ //Ensure the spec executor has the correct configurations
+ SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+
+ //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt"
+ Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0);
+ jobExecutionPlan = secondHopNode.getValue();
+ jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/out/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from);
+ Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
+ specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4"
+ Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0);
+ jobExecutionPlan = thirdHopNode.getValue();
+ jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Get the 4th hop - "Distcp from HDFS4 to ADLS-1"
+ Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
+ Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0);
+ jobExecutionPlan = fourthHopNode.getValue();
+ jobConfig = jobExecutionPlan.getJobSpec().getConfig();
+ Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL");
+ from = jobConfig.getString("from");
+ to = jobConfig.getString("to");
+ Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset");
+ Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/");
+ Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/");
+ Assert.assertEquals(jobConfig.getString("type"), "hadoopJava");
+ Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher");
+ Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE");
+ Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234");
+ Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential");
+ Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password");
+ //Ensure the spec executor has the correct configurations
+ specExecutor = jobExecutionPlan.getSpecExecutor();
+ Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443");
+ Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor");
+
+ //Ensure the fourth hop is the last
+ Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
+ }
+
+ @Test (dependsOnMethods = "testFindPathAfterFirstEdgeDeletion")
+ public void testFindPathAfterSecondEdgeDeletion()
+ throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException,
+ SpecNotFoundException, IOException {
+ //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
+ this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
+
+ //Ensure no path to destination.
+ Assert.assertNull(pathFinder.findPath());
+ }
+
+ @AfterClass
+ public void tearDown() {
+ }
+
+ public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor {
+ // Executor Instance
+ protected final Config config;
+
+ private SpecProducer<Spec> azkabanSpecProducer;
+
+ public TestAzkabanSpecExecutor(Config config) {
+ super(config);
+ this.config = config;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ //Do nothing
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ //Do nothing
+ }
+
+ @Override
+ public Future<String> getDescription() {
+ return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
+ }
+
+ @Override
+ public Future<? extends SpecProducer> getProducer() {
+ return new CompletedFuture<>(this.azkabanSpecProducer, null);
+ }
+
+ @Override
+ public Future<Config> getConfig() {
+ return new CompletedFuture<>(config, null);
+ }
+
+ @Override
+ public Future<String> getHealth() {
+ return new CompletedFuture<>("Healthy", null);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
new file mode 100644
index 0000000..2694f5c
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.net.URI;
+import java.util.Properties;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class BaseFlowEdgeFactoryTest {
+ @Test
+ public void testCreateFlowEdge() throws Exception {
+ Properties properties = new Properties();
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"node1");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "node2");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "node1:node2:edge1");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "FS:///flowEdgeTemplate");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2");
+ properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2");
+
+ FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory();
+
+ Properties props = new Properties();
+ URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ props.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(props);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+ Config edgeProps = ConfigUtils.propertiesToConfig(properties);
+ FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog);
+ Assert.assertEquals(flowEdge.getSrc(), "node1");
+ Assert.assertEquals(flowEdge.getDest(), "node2");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),"s1:d1");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),"/tmp2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),"s2:d2");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(),"InMemorySpecExecutor");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(),"InMemorySpecExecutor");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
index 04f2270..be7b597 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.service.modules.flowgraph;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
@@ -33,8 +32,6 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.template.FlowTemplate;
import org.apache.gobblin.service.modules.template.StaticFlowTemplate;
import org.apache.gobblin.util.ConfigUtils;
@@ -57,9 +54,7 @@ public class BaseFlowGraphTest {
BaseFlowGraph graph;
@BeforeClass
- public void setUp()
- throws URISyntaxException, ReflectiveOperationException, JobTemplate.TemplateException, SpecNotFoundException,
- IOException, DataNode.DataNodeCreationException {
+ public void setUp() throws URISyntaxException, DataNode.DataNodeCreationException {
Properties properties = new Properties();
properties.put("key1", "val1");
Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
@@ -81,9 +76,9 @@ public class BaseFlowGraphTest {
//Create a clone of node3
node3c = new BaseDataNode(node3Config);
- FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null, null);
- FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null, null);
- FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null, null);
+ FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null);
+ FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null);
+ FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null);
//Create edge instances
edgeId1 = "node1:node2:edge1";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
new file mode 100644
index 0000000..2542f5e
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class JobExecutionPlanDagFactoryTest {
+ private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
+ private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME;
+ private SpecExecutor specExecutor;
+ private List<JobTemplate> jobTemplates;
+
+ @BeforeClass
+ public void setUp() throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException {
+ // Create a FSFlowCatalog instance
+ URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ Properties properties = new Properties();
+ properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(properties);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
+ FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
+ this.jobTemplates = flowTemplate.getJobTemplates();
+
+ //Create a spec executor instance
+ properties = new Properties();
+ properties.put("specStore.fs.dir", "/tmp/testSpecStoreDir");
+ properties.put("specExecInstance.capabilities", "source:destination");
+ Config specExecutorConfig = ConfigUtils.propertiesToConfig(properties);
+ this.specExecutor = new InMemorySpecExecutor(specExecutorConfig);
+ }
+
+ @Test
+ public void testCreateDag() throws Exception {
+ //Create a list of JobExecutionPlans
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+ for (JobTemplate jobTemplate: this.jobTemplates) {
+ String jobSpecUri = Files.getNameWithoutExtension(new Path(jobTemplate.getUri()).getName());
+ jobExecutionPlans.add(new JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(jobTemplate.getRawTemplateConfig()).
+ withVersion("1").withTemplate(jobTemplate.getUri()).build(), specExecutor));
+ }
+
+ //Create a DAG from job execution plans.
+ Dag<JobExecutionPlan> dag = new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+
+ //Test DAG properties
+ Assert.assertEquals(dag.getStartNodes().size(), 1);
+ Assert.assertEquals(dag.getEndNodes().size(), 1);
+ Assert.assertEquals(dag.getNodes().size(), 4);
+ String startNodeName = new Path(dag.getStartNodes().get(0).getValue().getJobSpec().getUri()).getName();
+ Assert.assertEquals(startNodeName, "job1");
+ String templateUri = new Path(dag.getStartNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName();
+ Assert.assertEquals(templateUri, "job1.job");
+ String endNodeName = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getUri()).getName();
+ Assert.assertEquals(endNodeName, "job4");
+ templateUri = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName();
+ Assert.assertEquals(templateUri, "job4.job");
+
+ Dag.DagNode<JobExecutionPlan> startNode = dag.getStartNodes().get(0);
+ List<Dag.DagNode<JobExecutionPlan>> nextNodes = dag.getChildren(startNode);
+ Set<String> nodeSet = new HashSet<>();
+ for (Dag.DagNode<JobExecutionPlan> node: nextNodes) {
+ nodeSet.add(new Path(node.getValue().getJobSpec().getUri()).getName());
+ Dag.DagNode<JobExecutionPlan> nextNode = dag.getChildren(node).get(0);
+ Assert.assertEquals(new Path(nextNode.getValue().getJobSpec().getUri()).getName(), "job4");
+ }
+ Assert.assertTrue(nodeSet.contains("job2"));
+ Assert.assertTrue(nodeSet.contains("job3"));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
deleted file mode 100644
index 58d879e..0000000
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.template;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-
-public class JobTemplateDagFactoryTest {
- private static final String TEST_TEMPLATE_NAME = "test-template";
- private static final String TEST_FLOW_CONF_FILE_NAME="flow.conf";
- private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME + "/" + TEST_FLOW_CONF_FILE_NAME;
- FSFlowCatalog catalog;
-
- @BeforeClass
- public void setUp()
- throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException {
- URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
-
- // Create a FSFlowCatalog instance
- Properties properties = new Properties();
- properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
- Config config = ConfigFactory.parseProperties(properties);
- Config templateCatalogCfg = config
- .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- this.catalog = new FSFlowCatalog(templateCatalogCfg);
- }
-
- @Test
- public void testCreateDagFromJobTemplates() throws Exception {
- FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
- List<JobTemplate> jobTemplates = flowTemplate.getJobTemplates();
-
- //Create a DAG from job templates.
- Dag<JobTemplate> jobTemplateDag = JobTemplateDagFactory.createDagFromJobTemplates(jobTemplates);
-
- //Test DAG properties
- Assert.assertEquals(jobTemplateDag.getStartNodes().size(), 1);
- Assert.assertEquals(jobTemplateDag.getEndNodes().size(), 1);
- Assert.assertEquals(jobTemplateDag.getNodes().size(), 4);
- String startNodeName = new Path(jobTemplateDag.getStartNodes().get(0).getValue().getUri()).getName();
- Assert.assertEquals(startNodeName, "job1.conf");
- String endNodeName = new Path(jobTemplateDag.getEndNodes().get(0).getValue().getUri()).getName();
- Assert.assertEquals(endNodeName, "job4.conf");
-
- Dag.DagNode<JobTemplate> startNode = jobTemplateDag.getStartNodes().get(0);
- List<Dag.DagNode<JobTemplate>> nextNodes = jobTemplateDag.getChildren(startNode);
- Set<String> nodeSet = new HashSet<>();
- for(Dag.DagNode<JobTemplate> node: nextNodes) {
- nodeSet.add(new Path(node.getValue().getUri()).getName());
- Dag.DagNode<JobTemplate> nextNode = jobTemplateDag.getChildren(node).get(0);
- Assert.assertEquals(new Path(nextNode.getValue().getUri()).getName(), "job4.conf");
- }
- Assert.assertTrue(nodeSet.contains("job2.conf"));
- Assert.assertTrue(nodeSet.contains("job3.conf"));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
index b20606f..3c8ebd3 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java
@@ -23,19 +23,20 @@ import java.util.Properties;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
-import org.apache.gobblin.service.modules.dataset.HdfsDatasetDescriptor;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.template.FlowTemplate;
import org.testng.collections.Lists;
@@ -43,9 +44,8 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FSFlowCatalogTest {
- private static final String TEST_TEMPLATE_NAME = "test-template";
- private static final String TEST_FLOW_CONF_FILE_NAME="flow.conf";
- private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME + "/" + TEST_FLOW_CONF_FILE_NAME;
+ private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
+ private static final String TEST_TEMPLATE_DIR_URI = "FS:///" + TEST_TEMPLATE_NAME;
@Test
public void testGetFlowTemplate() throws Exception {
@@ -58,50 +58,45 @@ public class FSFlowCatalogTest {
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
- FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI));
+ FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_DIR_URI));
//Basic sanity check for the FlowTemplate
- Dag<JobTemplate> jobTemplateDag = flowTemplate.getDag();
- List<Dag.DagNode<JobTemplate>> dagNodes = jobTemplateDag.getNodes();
- Assert.assertTrue(dagNodes.size() == 4);
- Assert.assertEquals(jobTemplateDag.getStartNodes().size(), 1);
- Assert.assertEquals(jobTemplateDag.getEndNodes().size(), 1);
- Dag.DagNode<JobTemplate> dagNode = jobTemplateDag.getStartNodes().get(0);
- URI startNodeUri = this.getClass().getClassLoader().getResource("template_catalog/test-template/jobs/job1.conf").toURI();
- URI endNodeUri = this.getClass().getClassLoader().getResource("template_catalog/test-template/jobs/job4.conf").toURI();
- Assert.assertEquals(jobTemplateDag.getStartNodes().get(0).getValue().getUri(), startNodeUri);
- Assert.assertEquals(jobTemplateDag.getEndNodes().get(0).getValue().getUri(), endNodeUri);
List<JobTemplate> jobTemplates = flowTemplate.getJobTemplates();
Assert.assertEquals(jobTemplates.size(), 4);
- for(int i=0; i<4; i++) {
+ for (int i = 0; i < 4; i++) {
String uri = new Path(jobTemplates.get(i).getUri()).getName().split("\\.")[0];
String templateId = uri.substring(uri.length() - 1);
- for(int j=0; j<2; j++) {
+ for (int j = 0; j < 2; j++) {
Config jobTemplateConfig = jobTemplates.get(i).getRawTemplateConfig();
- String suffix = templateId + Integer.toString(j+1);
+ String suffix = templateId + Integer.toString(j + 1);
Assert.assertEquals(jobTemplateConfig.getString("key" + suffix), "val" + suffix);
}
}
- List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = flowTemplate.getInputOutputDatasetDescriptors();
+ Config flowConfig = ConfigFactory.empty().withValue("team.name", ConfigValueFactory.fromAnyRef("test-team"))
+ .withValue("dataset.name", ConfigValueFactory.fromAnyRef("test-dataset"));
+
+ List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = flowTemplate.getResolvingDatasetDescriptors(flowConfig);
Assert.assertTrue(inputOutputDescriptors.size() == 2);
List<String> dirs = Lists.newArrayList("inbound", "outbound");
- for(int i=0; i<2; i++) {
- for (int j=0; j<2; j++) {
- HdfsDatasetDescriptor datasetDescriptor;
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 2; j++) {
+ FSDatasetDescriptor datasetDescriptor;
if (j == 0) {
- datasetDescriptor = (HdfsDatasetDescriptor) inputOutputDescriptors.get(i).getLeft();
+ datasetDescriptor = (FSDatasetDescriptor) inputOutputDescriptors.get(i).getLeft();
} else {
- datasetDescriptor = (HdfsDatasetDescriptor) inputOutputDescriptors.get(i).getRight();
+ datasetDescriptor = (FSDatasetDescriptor) inputOutputDescriptors.get(i).getRight();
}
Assert.assertEquals(datasetDescriptor.getPlatform(), "hdfs");
- Assert.assertEquals(datasetDescriptor.getFormat(), "avro");
- Assert.assertEquals(datasetDescriptor.getPath(), "/data/" + dirs.get(i) + "/<TEAM_NAME>/<DATASET_NAME>");
+ Assert.assertEquals(datasetDescriptor.getFormatConfig().getFormat(), "avro");
+ Assert.assertEquals(datasetDescriptor.getPath(), "/data/" + dirs.get(i) + "/test-team/test-dataset");
}
}
Config flowTemplateConfig = flowTemplate.getRawTemplateConfig();
- Assert.assertEquals(flowTemplateConfig.getString("gobblin.flow.dataset.descriptor.input.0.class"), "org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor");
- Assert.assertEquals(flowTemplateConfig.getString("gobblin.flow.dataset.descriptor.output.0.class"), "org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor");
+ Assert.assertEquals(flowTemplateConfig.getString(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX + ".0."
+ + DatasetDescriptorConfigKeys.CLASS_KEY), FSDatasetDescriptor.class.getCanonicalName());
+ Assert.assertEquals(flowTemplateConfig.getString(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX
+ + ".0." + DatasetDescriptorConfigKeys.CLASS_KEY), FSDatasetDescriptor.class.getCanonicalName());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flow/flow.conf
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flow/flow.conf b/gobblin-service/src/test/resources/flow/flow.conf
new file mode 100644
index 0000000..f818df6
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow.conf
@@ -0,0 +1,24 @@
+team.name=testTeam
+dataset.name=testDataset
+user.to.proxy=testUser
+adls.user.to.proxy=adlsTestUser
+adls.oauth2.client.id=1234
+adls.ouath2.credential=credential
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.input.dataset.descriptor.format=avro
+gobblin.flow.input.dataset.descriptor.codec=NONE
+gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE
+
+#Output dataset - compressed and encrypted
+gobblin.flow.output.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.output.dataset.descriptor.platform=adls
+gobblin.flow.output.dataset.descriptor.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.output.dataset.descriptor.format=json
+gobblin.flow.output.dataset.descriptor.codec=gzip
+gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
new file mode 100644
index 0000000..a219e4f
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=ADLS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.AdlsDataNode
+data.node.fs.uri=adl://azuredatalakestore.net/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
new file mode 100644
index 0000000..cad5e03
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn01.grid.linkedin.com:8888/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
new file mode 100644
index 0000000..eeb7980
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-2
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn02.grid.linkedin.com:8888/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
new file mode 100644
index 0000000..61135ba
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-3
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn03.grid.linkedin.com:8888/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
new file mode 100644
index 0000000..a772f1c
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties
@@ -0,0 +1,3 @@
+data.node.id=HDFS-4
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode
+data.node.fs.uri=hdfs://hadoopnn04.grid.linkedin.com:8888/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
new file mode 100644
index 0000000..6683221
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=LocalFS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.LocalFSDataNode
+data.node.fs.uri=file:///
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
new file mode 100644
index 0000000..bcf6d44
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-1
+flow.edge.destination=HDFS-1
+flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
new file mode 100644
index 0000000..99d1ed7
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties
@@ -0,0 +1,10 @@
+flow.edge.source=HDFS-1
+flow.edge.destination=HDFS-3
+flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
new file mode 100644
index 0000000..537cbfa
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-2
+flow.edge.destination=HDFS-2
+flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
new file mode 100644
index 0000000..6ec2ea5
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-2
+flow.edge.destination=HDFS-4
+flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
new file mode 100644
index 0000000..ed6e899
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties
@@ -0,0 +1,13 @@
+flow.edge.source=HDFS-3
+flow.edge.destination=ADLS-1
+flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+
+# Proxy config
+flow.edge.proxy.host=adl-proxy.linkedin.com
+flow.edge.proxy.port=1234
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
new file mode 100644
index 0000000..eae2767
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties
@@ -0,0 +1,13 @@
+flow.edge.source=HDFS-4
+flow.edge.destination=ADLS-1
+flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
+
+# Proxy config
+flow.edge.proxy.host=adl-proxy.linkedin.com
+flow.edge.proxy.port=1234
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
new file mode 100644
index 0000000..268b67f
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties
@@ -0,0 +1,9 @@
+flow.edge.source=LocalFS-1
+flow.edge.destination=HDFS-1
+flow.edge.id=LocalFS-1:HDFS-1:localToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=fs:///
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
+flow.edge.specExecutors.0.specExecInstance.job.type=java
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
new file mode 100644
index 0000000..bc67810
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties
@@ -0,0 +1,9 @@
+flow.edge.source=LocalFS-1
+flow.edge.destination=HDFS-2
+flow.edge.id=LocalFS-1:HDFS-2:localToHdfs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=fs:///
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
+flow.edge.specExecutors.0.specExecInstance.job.type=java
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
deleted file mode 100644
index 43fa9a3..0000000
--- a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.core;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SystemUtils;
-import org.eclipse.jgit.api.Git;
-import org.eclipse.jgit.api.errors.GitAPIException;
-import org.eclipse.jgit.dircache.DirCache;
-import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.lib.RepositoryCache;
-import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.util.FS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-
-
-public class GitFlowGraphMonitorTest {
- private static final Logger logger = LoggerFactory.getLogger(GitFlowGraphMonitor.class);
- private Repository remoteRepo;
- private Git gitForPush;
- private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir";
- private final File remoteDir = new File(TEST_DIR + "/remote");
- private final File cloneDir = new File(TEST_DIR + "/clone");
- private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
- private static final String NODE_1_FILE = "node1.properties";
- private final File node1Dir = new File(flowGraphDir, "node1");
- private final File node1File = new File(node1Dir, NODE_1_FILE);
- private static final String NODE_2_FILE = "node2.properties";
- private final File node2Dir = new File(flowGraphDir, "node2");
- private final File node2File = new File(node2Dir, NODE_2_FILE);
- private final File edge1Dir = new File(node1Dir, "node2");
- private final File edge1File = new File(edge1Dir, "edge1.properties");
-
- private RefSpec masterRefSpec = new RefSpec("master");
- private FSFlowCatalog flowCatalog;
- private Config config;
- private BaseFlowGraph flowGraph;
- private GitFlowGraphMonitor gitFlowGraphMonitor;
-
- @BeforeClass
- public void setUp() throws Exception {
- cleanUpDir(TEST_DIR);
-
- // Create a bare repository
- RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED);
- this.remoteRepo = fileKey.open(false);
- this.remoteRepo.create(true);
-
- this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
-
- // push an empty commit as a base for detecting changes
- this.gitForPush.commit().setMessage("First commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.config = ConfigBuilder.create()
- .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
- + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
- .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph")
- .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
- .build();
-
- // Create a FSFlowCatalog instance
- URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
- Properties properties = new Properties();
- properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
- Config config = ConfigFactory.parseProperties(properties);
- Config templateCatalogCfg = config
- .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
-
- //Create a FlowGraph instance with defaults
- this.flowGraph = new BaseFlowGraph();
-
- this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph);
- this.gitFlowGraphMonitor.setActive(true);
- }
-
- private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, String paramValue)
- throws IOException, GitAPIException {
- // push a new node file
- nodeDir.mkdirs();
- nodeFile.createNewFile();
- Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8);
-
- // add, commit, push node
- this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call();
- this.gitForPush.commit().setMessage("Node commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.gitFlowGraphMonitor.processGitConfigChanges();
-
- //Check if node1 has been added to the FlowGraph
- DataNode dataNode = this.flowGraph.getNode(nodeId);
- Assert.assertEquals(dataNode.getId(), nodeId);
- Assert.assertTrue(dataNode.isActive());
- Assert.assertEquals(dataNode.getProps().getString("param1"), paramValue);
- }
-
- @Test
- public void testAddNode()
- throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
- testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1");
- testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2");
- }
-
- @Test (dependsOnMethods = "testAddNode")
- public void testAddEdge()
- throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
- // push a new node file
- this.edge1Dir.mkdirs();
- this.edge1File.createNewFile();
-
- Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8);
-
- // add, commit, push
- this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
- this.gitForPush.commit().setMessage("Edge commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.gitFlowGraphMonitor.processGitConfigChanges();
-
- //Check if edge1 has been added to the FlowGraph
- Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
- Assert.assertEquals(edgeSet.size(), 1);
- FlowEdge flowEdge = edgeSet.iterator().next();
- Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
- Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
- }
-
- @Test (dependsOnMethods = "testAddNode")
- public void testUpdateEdge()
- throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
- //Update edge1 file
- Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
- + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n"
- + "key1=value1\n", edge1File, Charsets.UTF_8);
-
- // add, commit, push
- this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
- this.gitForPush.commit().setMessage("Edge commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.gitFlowGraphMonitor.processGitConfigChanges();
-
- //Check if new edge1 has been added to the FlowGraph
- Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
- Assert.assertEquals(edgeSet.size(), 1);
- FlowEdge flowEdge = edgeSet.iterator().next();
- Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
- Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
- Assert.assertEquals(flowEdge.getProps().getString("key1"), "value1");
- }
-
- @Test (dependsOnMethods = "testUpdateEdge")
- public void testUpdateNode()
- throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
- //Update param1 value in node1 and check if updated node is added to the graph
- testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3");
- }
-
-
- @Test (dependsOnMethods = "testUpdateNode")
- public void testRemoveEdge() throws GitAPIException, IOException {
- // delete a config file
- edge1File.delete();
-
- //Node1 has 1 edge before delete
- Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
- Assert.assertEquals(edgeSet.size(), 1);
-
- // delete, commit, push
- DirCache ac = this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
- this.edge1Dir.getName(), this.edge1File.getName())).call();
- RevCommit cc = this.gitForPush.commit().setMessage("Edge remove commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.gitFlowGraphMonitor.processGitConfigChanges();
-
- //Check if edge1 has been deleted from the graph
- edgeSet = this.flowGraph.getEdges("node1");
- Assert.assertTrue(edgeSet.size() == 0);
- }
-
- @Test (dependsOnMethods = "testRemoveEdge")
- public void testRemoveNode() throws GitAPIException, IOException {
- //delete node file
- node1File.delete();
-
- //node1 is present in the graph before delete
- DataNode node1 = this.flowGraph.getNode("node1");
- Assert.assertNotNull(node1);
-
- // delete, commit, push
- DirCache ac = this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call();
- RevCommit cc = this.gitForPush.commit().setMessage("Node remove commit").call();
- this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
-
- this.gitFlowGraphMonitor.processGitConfigChanges();
-
- //Check if node1 has been deleted from the graph
- node1 = this.flowGraph.getNode("node1");
- Assert.assertNull(node1);
- }
-
-
- private void cleanUpDir(String dir) {
- File specStoreDir = new File(dir);
-
- // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
- for (int i = 0; i < 5; i++) {
- try {
- if (specStoreDir.exists()) {
- FileUtils.deleteDirectory(specStoreDir);
- }
- // if delete succeeded then break out of loop
- break;
- } catch (IOException e) {
- logger.warn("Cleanup delete directory failed for directory: " + dir, e);
- }
- }
- }
-
- private String formNodeFilePath(String groupDir, String fileName) {
- return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
- }
-
- private String formEdgeFilePath(String parentDir, String groupDir, String fileName) {
- return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
- }
-
- @AfterClass
- public void tearDown() throws Exception {
- cleanUpDir(TEST_DIR);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
deleted file mode 100644
index 9dd51a0..0000000
--- a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flowgraph;
-
-import java.net.URI;
-import java.util.Properties;
-
-import org.apache.gobblin.util.ConfigUtils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class BaseFlowEdgeFactoryTest {
- @Test
- public void testCreateFlowEdge() throws Exception {
- Properties properties = new Properties();
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"node1");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "node2");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "FS:///test-template/flow.conf");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2");
- properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2");
-
- FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory();
-
- Properties props = new Properties();
- URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
- props.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
- Config config = ConfigFactory.parseProperties(props);
- Config templateCatalogCfg = config
- .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg);
- Config edgeProps = ConfigUtils.propertiesToConfig(properties);
- FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog);
- Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
- Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),"s1:d1");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),"/tmp2");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),"s2:d2");
- Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(),"InMemorySpecExecutor");
- Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(),"InMemorySpecExecutor");
- }
-}
\ No newline at end of file