You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:25:02 UTC

[incubator-liminal] 22/43: Add job_start and job_end tasks

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git

commit 75965668ccf2e75b2f261ce0ca2d5d26ce117852
Author: zionrubin <zi...@naturalint.com>
AuthorDate: Sun Mar 22 10:47:52 2020 +0200

    Add job_start and job_end tasks
---
 rainbow/runners/airflow/dag/rainbow_dags.py        | 21 ++++--
 .../{ => airflow/tasks/defaults}/__init__.py       |  0
 .../tasks/{job_end.py => defaults/default_task.py} | 14 +++-
 .../airflow/tasks/{ => defaults}/job_end.py        | 21 ++++--
 .../tasks/{job_end.py => defaults/job_end.py~HEAD} | 21 ++++--
 .../airflow/tasks/{ => defaults}/job_start.py      | 20 ++++--
 .../{job_start.py => defaults/job_start.py~HEAD}   | 20 ++++--
 tests/runners/airflow/dag/test_rainbow_dags.py     | 31 +++++++--
 tests/runners/airflow/rainbow/rainbow.yml          |  8 ++-
 .../runners/airflow/tasks/defaults}/__init__.py    |  0
 .../runners/airflow/tasks/defaults/test_job_end.py | 77 ++++++++++++++++++++++
 .../airflow/tasks/defaults/test_job_start.py       | 77 ++++++++++++++++++++++
 12 files changed, 276 insertions(+), 34 deletions(-)

diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py
index 15b7d9a..71d18d2 100644
--- a/rainbow/runners/airflow/dag/rainbow_dags.py
+++ b/rainbow/runners/airflow/dag/rainbow_dags.py
@@ -22,8 +22,11 @@ import yaml
 from airflow import DAG
 from airflow.models import Variable
 
-from rainbow.core.util import files_util, class_util
+from rainbow.core.util import class_util
+from rainbow.core.util import files_util
 from rainbow.runners.airflow.model.task import Task
+from rainbow.runners.airflow.tasks.defaults.job_end import JobEndTask
+from rainbow.runners.airflow.tasks.defaults.job_start import JobStartTask
 
 
 def register_dags(configs_path):
@@ -42,8 +45,6 @@ def register_dags(configs_path):
             config = yaml.safe_load(stream)
 
             for pipeline in config['pipelines']:
-                parent = None
-
                 pipeline_name = pipeline['pipeline']
 
                 default_args = {
@@ -58,6 +59,9 @@ def register_dags(configs_path):
                     catchup=False
                 )
 
+                job_start_task = JobStartTask(dag, pipeline_name, None, pipeline, 'all_success')
+                parent = job_start_task.apply_task_to_dag()
+
                 trigger_rule = 'all_success'
                 if 'always_run' in config and config['always_run']:
                     trigger_rule = 'all_done'
@@ -70,12 +74,15 @@ def register_dags(configs_path):
 
                     parent = task_instance.apply_task_to_dag()
 
-                print(f'{pipeline_name}: {dag.tasks}')
+                    job_end_task = JobEndTask(dag, pipeline_name, parent, pipeline, 'all_done')
+                    job_end_task.apply_task_to_dag()
+
+                    print(f'{pipeline_name}: {dag.tasks}')
 
-                globals()[pipeline_name] = dag
+                    globals()[pipeline_name] = dag
 
-                dags.append(dag)
-    return dags
+                    dags.append(dag)
+                    return dags
 
 
 print(f'Loading task implementations..')
diff --git a/rainbow/runners/__init__.py b/rainbow/runners/airflow/tasks/defaults/__init__.py
similarity index 100%
copy from rainbow/runners/__init__.py
copy to rainbow/runners/airflow/tasks/defaults/__init__.py
diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/defaults/default_task.py
similarity index 74%
copy from rainbow/runners/airflow/tasks/job_end.py
copy to rainbow/runners/airflow/tasks/defaults/default_task.py
index 42b5e7f..0e901fc 100644
--- a/rainbow/runners/airflow/tasks/job_end.py
+++ b/rainbow/runners/airflow/tasks/defaults/default_task.py
@@ -15,17 +15,25 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""
+Default base task.
+"""
+from abc import abstractmethod
 
-from rainbow.runners.airflow.model import task
+from rainbow.runners.airflow.model.task import Task
 
 
-class JobEndTask(task.Task):
+class DefaultTask(Task):
     """
-    Job end task. Reports job end metrics.
+    Default Base task.
     """
 
     def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
+        metrics = self.config.get('metrics', {})
+        self.metrics_namespace = metrics.get('namespace', '')
+        self.metrics_backends = metrics.get('backends', [])
 
+    @abstractmethod
     def apply_task_to_dag(self):
         pass
diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/defaults/job_end.py
similarity index 61%
copy from rainbow/runners/airflow/tasks/job_end.py
copy to rainbow/runners/airflow/tasks/defaults/job_end.py
index 42b5e7f..e177ccc 100644
--- a/rainbow/runners/airflow/tasks/job_end.py
+++ b/rainbow/runners/airflow/tasks/defaults/job_end.py
@@ -16,16 +16,29 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from rainbow.runners.airflow.model import task
+from rainbow.runners.airflow.operators.job_status_operator import JobEndOperator
+from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask
 
 
-class JobEndTask(task.Task):
+class JobEndTask(DefaultTask):
     """
-    Job end task. Reports job end metrics.
+      Job end task. Reports job end metrics.
     """
 
     def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
 
     def apply_task_to_dag(self):
-        pass
+        job_end_task = JobEndOperator(
+            task_id='end',
+            namespace=self.metrics_namespace,
+            application_name=self.pipeline_name,
+            backends=self.metrics_backends,
+            dag=self.dag,
+            trigger_rule=self.trigger_rule
+        )
+
+        if self.parent:
+            self.parent.set_downstream(job_end_task)
+
+        return job_end_task
diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/defaults/job_end.py~HEAD
similarity index 61%
rename from rainbow/runners/airflow/tasks/job_end.py
rename to rainbow/runners/airflow/tasks/defaults/job_end.py~HEAD
index 42b5e7f..e177ccc 100644
--- a/rainbow/runners/airflow/tasks/job_end.py
+++ b/rainbow/runners/airflow/tasks/defaults/job_end.py~HEAD
@@ -16,16 +16,29 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from rainbow.runners.airflow.model import task
+from rainbow.runners.airflow.operators.job_status_operator import JobEndOperator
+from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask
 
 
-class JobEndTask(task.Task):
+class JobEndTask(DefaultTask):
     """
-    Job end task. Reports job end metrics.
+      Job end task. Reports job end metrics.
     """
 
     def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
 
     def apply_task_to_dag(self):
-        pass
+        job_end_task = JobEndOperator(
+            task_id='end',
+            namespace=self.metrics_namespace,
+            application_name=self.pipeline_name,
+            backends=self.metrics_backends,
+            dag=self.dag,
+            trigger_rule=self.trigger_rule
+        )
+
+        if self.parent:
+            self.parent.set_downstream(job_end_task)
+
+        return job_end_task
diff --git a/rainbow/runners/airflow/tasks/job_start.py b/rainbow/runners/airflow/tasks/defaults/job_start.py
similarity index 63%
copy from rainbow/runners/airflow/tasks/job_start.py
copy to rainbow/runners/airflow/tasks/defaults/job_start.py
index 64a2f4a..e196919 100644
--- a/rainbow/runners/airflow/tasks/job_start.py
+++ b/rainbow/runners/airflow/tasks/defaults/job_start.py
@@ -15,11 +15,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from rainbow.runners.airflow.operators.job_status_operator import JobStartOperator
+from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask
 
-from rainbow.runners.airflow.model import task
 
-
-class JobStartTask(task.Task):
+class JobStartTask(DefaultTask):
     """
     Job start task. Reports job start metrics.
     """
@@ -28,4 +28,16 @@ class JobStartTask(task.Task):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
 
     def apply_task_to_dag(self):
-        pass
+        job_start_task = JobStartOperator(
+            task_id='start',
+            namespace=self.metrics_namespace,
+            application_name=self.pipeline_name,
+            backends=self.metrics_backends,
+            dag=self.dag,
+            trigger_rule=self.trigger_rule
+        )
+
+        if self.parent:
+            self.parent.set_downstream(job_start_task)
+
+        return job_start_task
diff --git a/rainbow/runners/airflow/tasks/job_start.py b/rainbow/runners/airflow/tasks/defaults/job_start.py~HEAD
similarity index 63%
rename from rainbow/runners/airflow/tasks/job_start.py
rename to rainbow/runners/airflow/tasks/defaults/job_start.py~HEAD
index 64a2f4a..e196919 100644
--- a/rainbow/runners/airflow/tasks/job_start.py
+++ b/rainbow/runners/airflow/tasks/defaults/job_start.py~HEAD
@@ -15,11 +15,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from rainbow.runners.airflow.operators.job_status_operator import JobStartOperator
+from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask
 
-from rainbow.runners.airflow.model import task
 
-
-class JobStartTask(task.Task):
+class JobStartTask(DefaultTask):
     """
     Job start task. Reports job start metrics.
     """
@@ -28,4 +28,16 @@ class JobStartTask(task.Task):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
 
     def apply_task_to_dag(self):
-        pass
+        job_start_task = JobStartOperator(
+            task_id='start',
+            namespace=self.metrics_namespace,
+            application_name=self.pipeline_name,
+            backends=self.metrics_backends,
+            dag=self.dag,
+            trigger_rule=self.trigger_rule
+        )
+
+        if self.parent:
+            self.parent.set_downstream(job_start_task)
+
+        return job_start_task
diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py
index c8f2e38..d8c1afc 100644
--- a/tests/runners/airflow/dag/test_rainbow_dags.py
+++ b/tests/runners/airflow/dag/test_rainbow_dags.py
@@ -1,17 +1,38 @@
 import os
+import unittest
 from unittest import TestCase
 
 from rainbow.runners.airflow.dag import rainbow_dags
-import unittest
+from rainbow.runners.airflow.operators.job_status_operator import JobEndOperator, JobStartOperator
 
 
 class Test(TestCase):
     def test_register_dags(self):
-        base_path = os.path.join(os.path.dirname(__file__), '../rainbow')
-        dags = rainbow_dags.register_dags(base_path)
+        dags = self.get_register_dags()
+
         self.assertEqual(len(dags), 1)
-        # TODO: elaborate test
-        pass
+
+        test_pipeline = dags[0]
+        self.assertEqual(test_pipeline.dag_id, 'my_pipeline')
+
+    def test_default_start_task(self):
+        dags = self.get_register_dags()
+
+        task_dict = dags[0].task_dict
+
+        self.assertIsInstance(task_dict['start'], JobStartOperator)
+
+    def test_default_end_task(self):
+        dags = self.get_register_dags()
+
+        task_dict = dags[0].task_dict
+
+        self.assertIsInstance(task_dict['end'], JobEndOperator)
+
+    @staticmethod
+    def get_register_dags():
+        base_path = os.path.join(os.path.dirname(__file__), '../rainbow')
+        return rainbow_dags.register_dags(base_path)
 
 
 if __name__ == '__main__':
diff --git a/tests/runners/airflow/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml
index e9f9045..27507fd 100644
--- a/tests/runners/airflow/rainbow/rainbow.yml
+++ b/tests/runners/airflow/rainbow/rainbow.yml
@@ -23,7 +23,9 @@ pipelines:
     start_date: 1970-01-01
     timeout-minutes: 45
     schedule: 0 * 1 * *
-    metrics-namespace: TestNamespace
+    metrics:
+     namespace: TestNamespace
+     backends: [ 'cloudwatch' ]
     tasks:
       - task: my_static_input_task
         type: python
@@ -36,7 +38,7 @@ pipelines:
         input_type: static
         input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
         output_path: /output.json
-        cmd: python -u helloworld.py
+        cmd: python -u hello_world.py
 #      - task: my_parallelized_static_input_task
 #        type: python
 #        description: parallelized static input task
@@ -59,7 +61,7 @@ pipelines:
           env2: "b"
         input_type: task
         input_path: my_static_input_task
-        cmd: python -u helloworld.py
+        cmd: python -u hello_world.py
 services:
   - service:
     name: my_python_server
diff --git a/rainbow/runners/__init__.py b/tests/runners/airflow/tasks/defaults/__init__.py
similarity index 100%
rename from rainbow/runners/__init__.py
rename to tests/runners/airflow/tasks/defaults/__init__.py
diff --git a/tests/runners/airflow/tasks/defaults/test_job_end.py b/tests/runners/airflow/tasks/defaults/test_job_end.py
new file mode 100644
index 0000000..9a2c398
--- /dev/null
+++ b/tests/runners/airflow/tasks/defaults/test_job_end.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import TestCase
+
+from rainbow.runners.airflow.tasks.defaults import job_end
+from tests.util import dag_test_utils
+
+
+class TestJobEndTask(TestCase):
+
+    def test_apply_task_to_dag(self):
+        conf = {
+            'pipeline': 'my_pipeline',
+            'metrics': {'namespace': 'EndJobNameSpace', 'backends': ['cloudwatch']},
+        }
+
+        dag = dag_test_utils.create_dag()
+
+        task0 = job_end.JobEndTask(dag, 'my_end_pipeline', None, conf, 'all_done')
+        task0.apply_task_to_dag()
+
+        self.assertEqual(len(dag.tasks), 1)
+        dag_task0 = dag.tasks[0]
+
+        self.assertEqual(dag_task0.namespace, 'EndJobNameSpace')
+        self.assertEqual(dag_task0.backends, ['cloudwatch'])
+
+        self.assertEqual(dag_task0.task_id, 'end')
+
+    def test_apply_task_to_dag_missing_metrics(self):
+        conf = {'pipeline': 'my_pipeline'}
+        dag = dag_test_utils.create_dag()
+
+        task0 = job_end.JobEndTask(dag, 'my_end_pipeline', None, conf, 'all_done')
+        task0.apply_task_to_dag()
+
+        self.assertEqual(len(dag.tasks), 1)
+        dag_task0 = dag.tasks[0]
+
+        self.assertEqual(dag_task0.namespace, '')
+        self.assertEqual(dag_task0.backends, [])
+        self.assertEqual(dag_task0.trigger_rule, 'all_done')
+
+    def test_apply_task_to_dag_with_partial_configuration(self):
+        conf = {'pipeline': 'my_pipeline', 'metrics': {'namespace': 'EndJobNameSpace'}}
+        dag = dag_test_utils.create_dag()
+
+        task0 = job_end.JobEndTask(dag, 'my_end_pipeline', None, conf, 'all_done')
+        task0.apply_task_to_dag()
+
+        self.assertEqual(len(dag.tasks), 1)
+        dag_task0 = dag.tasks[0]
+
+        self.assertEqual(dag_task0.namespace, 'EndJobNameSpace')
+        self.assertEqual(dag_task0.backends, [])
+        self.assertEqual(dag_task0.trigger_rule, 'all_done')
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/runners/airflow/tasks/defaults/test_job_start.py b/tests/runners/airflow/tasks/defaults/test_job_start.py
new file mode 100644
index 0000000..d07cf4b
--- /dev/null
+++ b/tests/runners/airflow/tasks/defaults/test_job_start.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import TestCase
+
+from rainbow.runners.airflow.tasks.defaults import job_end, job_start
+from tests.util import dag_test_utils
+
+
+class TestJobStartTask(TestCase):
+
+    def test_apply_task_to_dag(self):
+        conf = {
+            'pipeline': 'my_pipeline',
+            'metrics': {'namespace': 'StartJobNameSpace', 'backends': ['cloudwatch']},
+        }
+
+        dag = dag_test_utils.create_dag()
+
+        task0 = job_start.JobStartTask(dag, 'my_start_pipeline', None, conf, 'all_success')
+        task0.apply_task_to_dag()
+
+        self.assertEqual(len(dag.tasks), 1)
+        dag_task0 = dag.tasks[0]
+
+        self.assertEqual(dag_task0.namespace, 'StartJobNameSpace')
+        self.assertEqual(dag_task0.backends, ['cloudwatch'])
+
+        self.assertEqual(dag_task0.task_id, 'start')
+
+    def test_apply_task_to_dag_missing_metrics(self):
+        conf = {'pipeline': 'my_pipeline'}
+
+        dag = dag_test_utils.create_dag()
+
+        task0 = job_start.JobStartTask(dag, 'my_end_pipeline', None, conf, 'all_success')
+        task0.apply_task_to_dag()
+
+        self.assertEqual(len(dag.tasks), 1)
+        dag_task0 = dag.tasks[0]
+
+        self.assertEqual(dag_task0.namespace, '')
+        self.assertEqual(dag_task0.backends, [])
+        self.assertEqual(dag_task0.trigger_rule, 'all_success')
+
+    def test_apply_task_to_dag_with_partial_configuration(self):
+        conf = {'pipeline': 'my_pipeline', 'metrics': {'namespace': 'StartJobNameSpace'}}
+        dag = dag_test_utils.create_dag()
+
+        task0 = job_start.JobStartTask(dag, 'my_start_pipeline', None, conf, 'all_success')
+        task0.apply_task_to_dag()
+
+        self.assertEqual(len(dag.tasks), 1)
+        dag_task0 = dag.tasks[0]
+
+        self.assertEqual(dag_task0.namespace, 'StartJobNameSpace')
+        self.assertEqual(dag_task0.backends, [])
+
+
+if __name__ == '__main__':
+    unittest.main()