You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:25:02 UTC
[incubator-liminal] 22/43: Add job_start and job_end tasks
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 75965668ccf2e75b2f261ce0ca2d5d26ce117852
Author: zionrubin <zi...@naturalint.com>
AuthorDate: Sun Mar 22 10:47:52 2020 +0200
Add job_start and job_end tasks
---
rainbow/runners/airflow/dag/rainbow_dags.py | 21 ++++--
.../{ => airflow/tasks/defaults}/__init__.py | 0
.../tasks/{job_end.py => defaults/default_task.py} | 14 +++-
.../airflow/tasks/{ => defaults}/job_end.py | 21 ++++--
.../tasks/{job_end.py => defaults/job_end.py~HEAD} | 21 ++++--
.../airflow/tasks/{ => defaults}/job_start.py | 20 ++++--
.../{job_start.py => defaults/job_start.py~HEAD} | 20 ++++--
tests/runners/airflow/dag/test_rainbow_dags.py | 31 +++++++--
tests/runners/airflow/rainbow/rainbow.yml | 8 ++-
.../runners/airflow/tasks/defaults}/__init__.py | 0
.../runners/airflow/tasks/defaults/test_job_end.py | 77 ++++++++++++++++++++++
.../airflow/tasks/defaults/test_job_start.py | 77 ++++++++++++++++++++++
12 files changed, 276 insertions(+), 34 deletions(-)
diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py
index 15b7d9a..71d18d2 100644
--- a/rainbow/runners/airflow/dag/rainbow_dags.py
+++ b/rainbow/runners/airflow/dag/rainbow_dags.py
@@ -22,8 +22,11 @@ import yaml
from airflow import DAG
from airflow.models import Variable
-from rainbow.core.util import files_util, class_util
+from rainbow.core.util import class_util
+from rainbow.core.util import files_util
from rainbow.runners.airflow.model.task import Task
+from rainbow.runners.airflow.tasks.defaults.job_end import JobEndTask
+from rainbow.runners.airflow.tasks.defaults.job_start import JobStartTask
def register_dags(configs_path):
@@ -42,8 +45,6 @@ def register_dags(configs_path):
config = yaml.safe_load(stream)
for pipeline in config['pipelines']:
- parent = None
-
pipeline_name = pipeline['pipeline']
default_args = {
@@ -58,6 +59,9 @@ def register_dags(configs_path):
catchup=False
)
+ job_start_task = JobStartTask(dag, pipeline_name, None, pipeline, 'all_success')
+ parent = job_start_task.apply_task_to_dag()
+
trigger_rule = 'all_success'
if 'always_run' in config and config['always_run']:
trigger_rule = 'all_done'
@@ -70,12 +74,15 @@ def register_dags(configs_path):
parent = task_instance.apply_task_to_dag()
- print(f'{pipeline_name}: {dag.tasks}')
+ job_end_task = JobEndTask(dag, pipeline_name, parent, pipeline, 'all_done')
+ job_end_task.apply_task_to_dag()
+
+ print(f'{pipeline_name}: {dag.tasks}')
- globals()[pipeline_name] = dag
+ globals()[pipeline_name] = dag
- dags.append(dag)
- return dags
+ dags.append(dag)
+ return dags
print(f'Loading task implementations..')
diff --git a/rainbow/runners/__init__.py b/rainbow/runners/airflow/tasks/defaults/__init__.py
similarity index 100%
copy from rainbow/runners/__init__.py
copy to rainbow/runners/airflow/tasks/defaults/__init__.py
diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/defaults/default_task.py
similarity index 74%
copy from rainbow/runners/airflow/tasks/job_end.py
copy to rainbow/runners/airflow/tasks/defaults/default_task.py
index 42b5e7f..0e901fc 100644
--- a/rainbow/runners/airflow/tasks/job_end.py
+++ b/rainbow/runners/airflow/tasks/defaults/default_task.py
@@ -15,17 +15,25 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""
+Default base task.
+"""
+from abc import abstractmethod
-from rainbow.runners.airflow.model import task
+from rainbow.runners.airflow.model.task import Task
-class JobEndTask(task.Task):
+class DefaultTask(Task):
"""
- Job end task. Reports job end metrics.
+ Default Base task.
"""
def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
+ metrics = self.config.get('metrics', {})
+ self.metrics_namespace = metrics.get('namespace', '')
+ self.metrics_backends = metrics.get('backends', [])
+ @abstractmethod
def apply_task_to_dag(self):
pass
diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/defaults/job_end.py
similarity index 61%
copy from rainbow/runners/airflow/tasks/job_end.py
copy to rainbow/runners/airflow/tasks/defaults/job_end.py
index 42b5e7f..e177ccc 100644
--- a/rainbow/runners/airflow/tasks/job_end.py
+++ b/rainbow/runners/airflow/tasks/defaults/job_end.py
@@ -16,16 +16,29 @@
# specific language governing permissions and limitations
# under the License.
-from rainbow.runners.airflow.model import task
+from rainbow.runners.airflow.operators.job_status_operator import JobEndOperator
+from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask
-class JobEndTask(task.Task):
+class JobEndTask(DefaultTask):
"""
- Job end task. Reports job end metrics.
+ Job end task. Reports job end metrics.
"""
def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
def apply_task_to_dag(self):
- pass
+ job_end_task = JobEndOperator(
+ task_id='end',
+ namespace=self.metrics_namespace,
+ application_name=self.pipeline_name,
+ backends=self.metrics_backends,
+ dag=self.dag,
+ trigger_rule=self.trigger_rule
+ )
+
+ if self.parent:
+ self.parent.set_downstream(job_end_task)
+
+ return job_end_task
diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/defaults/job_end.py~HEAD
similarity index 61%
rename from rainbow/runners/airflow/tasks/job_end.py
rename to rainbow/runners/airflow/tasks/defaults/job_end.py~HEAD
index 42b5e7f..e177ccc 100644
--- a/rainbow/runners/airflow/tasks/job_end.py
+++ b/rainbow/runners/airflow/tasks/defaults/job_end.py~HEAD
@@ -16,16 +16,29 @@
# specific language governing permissions and limitations
# under the License.
-from rainbow.runners.airflow.model import task
+from rainbow.runners.airflow.operators.job_status_operator import JobEndOperator
+from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask
-class JobEndTask(task.Task):
+class JobEndTask(DefaultTask):
"""
- Job end task. Reports job end metrics.
+ Job end task. Reports job end metrics.
"""
def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
def apply_task_to_dag(self):
- pass
+ job_end_task = JobEndOperator(
+ task_id='end',
+ namespace=self.metrics_namespace,
+ application_name=self.pipeline_name,
+ backends=self.metrics_backends,
+ dag=self.dag,
+ trigger_rule=self.trigger_rule
+ )
+
+ if self.parent:
+ self.parent.set_downstream(job_end_task)
+
+ return job_end_task
diff --git a/rainbow/runners/airflow/tasks/job_start.py b/rainbow/runners/airflow/tasks/defaults/job_start.py
similarity index 63%
copy from rainbow/runners/airflow/tasks/job_start.py
copy to rainbow/runners/airflow/tasks/defaults/job_start.py
index 64a2f4a..e196919 100644
--- a/rainbow/runners/airflow/tasks/job_start.py
+++ b/rainbow/runners/airflow/tasks/defaults/job_start.py
@@ -15,11 +15,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from rainbow.runners.airflow.operators.job_status_operator import JobStartOperator
+from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask
-from rainbow.runners.airflow.model import task
-
-class JobStartTask(task.Task):
+class JobStartTask(DefaultTask):
"""
Job start task. Reports job start metrics.
"""
@@ -28,4 +28,16 @@ class JobStartTask(task.Task):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
def apply_task_to_dag(self):
- pass
+ job_start_task = JobStartOperator(
+ task_id='start',
+ namespace=self.metrics_namespace,
+ application_name=self.pipeline_name,
+ backends=self.metrics_backends,
+ dag=self.dag,
+ trigger_rule=self.trigger_rule
+ )
+
+ if self.parent:
+ self.parent.set_downstream(job_start_task)
+
+ return job_start_task
diff --git a/rainbow/runners/airflow/tasks/job_start.py b/rainbow/runners/airflow/tasks/defaults/job_start.py~HEAD
similarity index 63%
rename from rainbow/runners/airflow/tasks/job_start.py
rename to rainbow/runners/airflow/tasks/defaults/job_start.py~HEAD
index 64a2f4a..e196919 100644
--- a/rainbow/runners/airflow/tasks/job_start.py
+++ b/rainbow/runners/airflow/tasks/defaults/job_start.py~HEAD
@@ -15,11 +15,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from rainbow.runners.airflow.operators.job_status_operator import JobStartOperator
+from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask
-from rainbow.runners.airflow.model import task
-
-class JobStartTask(task.Task):
+class JobStartTask(DefaultTask):
"""
Job start task. Reports job start metrics.
"""
@@ -28,4 +28,16 @@ class JobStartTask(task.Task):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
def apply_task_to_dag(self):
- pass
+ job_start_task = JobStartOperator(
+ task_id='start',
+ namespace=self.metrics_namespace,
+ application_name=self.pipeline_name,
+ backends=self.metrics_backends,
+ dag=self.dag,
+ trigger_rule=self.trigger_rule
+ )
+
+ if self.parent:
+ self.parent.set_downstream(job_start_task)
+
+ return job_start_task
diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py
index c8f2e38..d8c1afc 100644
--- a/tests/runners/airflow/dag/test_rainbow_dags.py
+++ b/tests/runners/airflow/dag/test_rainbow_dags.py
@@ -1,17 +1,38 @@
import os
+import unittest
from unittest import TestCase
from rainbow.runners.airflow.dag import rainbow_dags
-import unittest
+from rainbow.runners.airflow.operators.job_status_operator import JobEndOperator, JobStartOperator
class Test(TestCase):
def test_register_dags(self):
- base_path = os.path.join(os.path.dirname(__file__), '../rainbow')
- dags = rainbow_dags.register_dags(base_path)
+ dags = self.get_register_dags()
+
self.assertEqual(len(dags), 1)
- # TODO: elaborate test
- pass
+
+ test_pipeline = dags[0]
+ self.assertEqual(test_pipeline.dag_id, 'my_pipeline')
+
+ def test_default_start_task(self):
+ dags = self.get_register_dags()
+
+ task_dict = dags[0].task_dict
+
+ self.assertIsInstance(task_dict['start'], JobStartOperator)
+
+ def test_default_end_task(self):
+ dags = self.get_register_dags()
+
+ task_dict = dags[0].task_dict
+
+ self.assertIsInstance(task_dict['end'], JobEndOperator)
+
+ @staticmethod
+ def get_register_dags():
+ base_path = os.path.join(os.path.dirname(__file__), '../rainbow')
+ return rainbow_dags.register_dags(base_path)
if __name__ == '__main__':
diff --git a/tests/runners/airflow/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml
index e9f9045..27507fd 100644
--- a/tests/runners/airflow/rainbow/rainbow.yml
+++ b/tests/runners/airflow/rainbow/rainbow.yml
@@ -23,7 +23,9 @@ pipelines:
start_date: 1970-01-01
timeout-minutes: 45
schedule: 0 * 1 * *
- metrics-namespace: TestNamespace
+ metrics:
+ namespace: TestNamespace
+ backends: [ 'cloudwatch' ]
tasks:
- task: my_static_input_task
type: python
@@ -36,7 +38,7 @@ pipelines:
input_type: static
input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
output_path: /output.json
- cmd: python -u helloworld.py
+ cmd: python -u hello_world.py
# - task: my_parallelized_static_input_task
# type: python
# description: parallelized static input task
@@ -59,7 +61,7 @@ pipelines:
env2: "b"
input_type: task
input_path: my_static_input_task
- cmd: python -u helloworld.py
+ cmd: python -u hello_world.py
services:
- service:
name: my_python_server
diff --git a/rainbow/runners/__init__.py b/tests/runners/airflow/tasks/defaults/__init__.py
similarity index 100%
rename from rainbow/runners/__init__.py
rename to tests/runners/airflow/tasks/defaults/__init__.py
diff --git a/tests/runners/airflow/tasks/defaults/test_job_end.py b/tests/runners/airflow/tasks/defaults/test_job_end.py
new file mode 100644
index 0000000..9a2c398
--- /dev/null
+++ b/tests/runners/airflow/tasks/defaults/test_job_end.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import TestCase
+
+from rainbow.runners.airflow.tasks.defaults import job_end
+from tests.util import dag_test_utils
+
+
+class TestJobEndTask(TestCase):
+
+ def test_apply_task_to_dag(self):
+ conf = {
+ 'pipeline': 'my_pipeline',
+ 'metrics': {'namespace': 'EndJobNameSpace', 'backends': ['cloudwatch']},
+ }
+
+ dag = dag_test_utils.create_dag()
+
+ task0 = job_end.JobEndTask(dag, 'my_end_pipeline', None, conf, 'all_done')
+ task0.apply_task_to_dag()
+
+ self.assertEqual(len(dag.tasks), 1)
+ dag_task0 = dag.tasks[0]
+
+ self.assertEqual(dag_task0.namespace, 'EndJobNameSpace')
+ self.assertEqual(dag_task0.backends, ['cloudwatch'])
+
+ self.assertEqual(dag_task0.task_id, 'end')
+
+ def test_apply_task_to_dag_missing_metrics(self):
+ conf = {'pipeline': 'my_pipeline'}
+ dag = dag_test_utils.create_dag()
+
+ task0 = job_end.JobEndTask(dag, 'my_end_pipeline', None, conf, 'all_done')
+ task0.apply_task_to_dag()
+
+ self.assertEqual(len(dag.tasks), 1)
+ dag_task0 = dag.tasks[0]
+
+ self.assertEqual(dag_task0.namespace, '')
+ self.assertEqual(dag_task0.backends, [])
+ self.assertEqual(dag_task0.trigger_rule, 'all_done')
+
+ def test_apply_task_to_dag_with_partial_configuration(self):
+ conf = {'pipeline': 'my_pipeline', 'metrics': {'namespace': 'EndJobNameSpace'}}
+ dag = dag_test_utils.create_dag()
+
+ task0 = job_end.JobEndTask(dag, 'my_end_pipeline', None, conf, 'all_done')
+ task0.apply_task_to_dag()
+
+ self.assertEqual(len(dag.tasks), 1)
+ dag_task0 = dag.tasks[0]
+
+ self.assertEqual(dag_task0.namespace, 'EndJobNameSpace')
+ self.assertEqual(dag_task0.backends, [])
+ self.assertEqual(dag_task0.trigger_rule, 'all_done')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tests/runners/airflow/tasks/defaults/test_job_start.py b/tests/runners/airflow/tasks/defaults/test_job_start.py
new file mode 100644
index 0000000..d07cf4b
--- /dev/null
+++ b/tests/runners/airflow/tasks/defaults/test_job_start.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import TestCase
+
+from rainbow.runners.airflow.tasks.defaults import job_end, job_start
+from tests.util import dag_test_utils
+
+
+class TestJobStartTask(TestCase):
+
+ def test_apply_task_to_dag(self):
+ conf = {
+ 'pipeline': 'my_pipeline',
+ 'metrics': {'namespace': 'StartJobNameSpace', 'backends': ['cloudwatch']},
+ }
+
+ dag = dag_test_utils.create_dag()
+
+ task0 = job_start.JobStartTask(dag, 'my_start_pipeline', None, conf, 'all_success')
+ task0.apply_task_to_dag()
+
+ self.assertEqual(len(dag.tasks), 1)
+ dag_task0 = dag.tasks[0]
+
+ self.assertEqual(dag_task0.namespace, 'StartJobNameSpace')
+ self.assertEqual(dag_task0.backends, ['cloudwatch'])
+
+ self.assertEqual(dag_task0.task_id, 'start')
+
+ def test_apply_task_to_dag_missing_metrics(self):
+ conf = {'pipeline': 'my_pipeline'}
+
+ dag = dag_test_utils.create_dag()
+
+ task0 = job_start.JobStartTask(dag, 'my_end_pipeline', None, conf, 'all_success')
+ task0.apply_task_to_dag()
+
+ self.assertEqual(len(dag.tasks), 1)
+ dag_task0 = dag.tasks[0]
+
+ self.assertEqual(dag_task0.namespace, '')
+ self.assertEqual(dag_task0.backends, [])
+ self.assertEqual(dag_task0.trigger_rule, 'all_success')
+
+ def test_apply_task_to_dag_with_partial_configuration(self):
+ conf = {'pipeline': 'my_pipeline', 'metrics': {'namespace': 'StartJobNameSpace'}}
+ dag = dag_test_utils.create_dag()
+
+ task0 = job_start.JobStartTask(dag, 'my_start_pipeline', None, conf, 'all_success')
+ task0.apply_task_to_dag()
+
+ self.assertEqual(len(dag.tasks), 1)
+ dag_task0 = dag.tasks[0]
+
+ self.assertEqual(dag_task0.namespace, 'StartJobNameSpace')
+ self.assertEqual(dag_task0.backends, [])
+
+
+if __name__ == '__main__':
+ unittest.main()