You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:25:11 UTC

[incubator-liminal] 31/43: Add pipeline configuration as default arguments

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git

commit 253a15a16cef606085632d83ddf48a6640450b52
Author: Oded Rosenberg <od...@naturalint.com>
AuthorDate: Thu Apr 9 15:06:03 2020 +0300

    Add pipeline configuration as default arguments
---
 rainbow/runners/airflow/dag/rainbow_dags.py    | 11 ++++++++---
 tests/runners/airflow/dag/test_rainbow_dags.py |  9 +++++++++
 tests/runners/airflow/rainbow/rainbow.yml      |  7 ++++++-
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py
index 6b071fd..d5e3be1 100644
--- a/rainbow/runners/airflow/dag/rainbow_dags.py
+++ b/rainbow/runners/airflow/dag/rainbow_dags.py
@@ -28,6 +28,8 @@ from rainbow.runners.airflow.model.task import Task
 from rainbow.runners.airflow.tasks.defaults.job_end import JobEndTask
 from rainbow.runners.airflow.tasks.defaults.job_start import JobStartTask
 
+__DEPENDS_ON_PAST = 'depends_on_past'
+
 
 def register_dags(configs_path):
     """
@@ -47,12 +49,15 @@ def register_dags(configs_path):
             for pipeline in config['pipelines']:
                 pipeline_name = pipeline['pipeline']
 
-                default_args = {
-                    'owner': config['owner'],
+                default_args = {k: v for k, v in pipeline.items()}
+
+                override_args = {
                     'start_date': datetime.combine(pipeline['start_date'], datetime.min.time()),
-                    'depends_on_past': False,
+                    __DEPENDS_ON_PAST: default_args[__DEPENDS_ON_PAST] if __DEPENDS_ON_PAST in default_args else False,
                 }
 
+                default_args.update(override_args)
+
                 dag = DAG(
                     dag_id=pipeline_name,
                     default_args=default_args,
diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py
index c744ce5..5ffdf07 100644
--- a/tests/runners/airflow/dag/test_rainbow_dags.py
+++ b/tests/runners/airflow/dag/test_rainbow_dags.py
@@ -31,6 +31,15 @@ class Test(TestCase):
 
         self.assertIsInstance(task_dict['end'], JobEndOperator)
 
+    def test_default_args(self):
+        dag = self.get_register_dags()[0]
+        default_args = dag.default_args
+
+        keys = default_args.keys()
+        self.assertIn('default_arg_loaded', keys)
+        self.assertIn('default_array_loaded', keys)
+        self.assertIn('default_object_loaded', keys)
+
     @staticmethod
     def get_register_dags():
         base_path = os.path.join(os.path.dirname(__file__), '../rainbow')
diff --git a/tests/runners/airflow/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml
index 05c0a09..0b08a1f 100644
--- a/tests/runners/airflow/rainbow/rainbow.yml
+++ b/tests/runners/airflow/rainbow/rainbow.yml
@@ -17,12 +17,17 @@
 # under the License.
 ---
 name: MyPipeline
-owner: Bosco Albert Baracus
 pipelines:
   - pipeline: my_pipeline
+    owner: Bosco Albert Baracus
     start_date: 1970-01-01
     timeout_minutes: 45
     schedule: 0 * 1 * *
+    default_arg_loaded: check
+    default_array_loaded: [2, 3, 4]
+    default_object_loaded:
+      key1: val1
+      key2: val2
     metrics:
      namespace: TestNamespace
      backends: [ 'cloudwatch' ]