You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:25:11 UTC
[incubator-liminal] 31/43: Add pipeline configuration as default
arguments
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 253a15a16cef606085632d83ddf48a6640450b52
Author: Oded Rosenberg <od...@naturalint.com>
AuthorDate: Thu Apr 9 15:06:03 2020 +0300
Add pipeline configuration as default arguments
---
rainbow/runners/airflow/dag/rainbow_dags.py | 11 ++++++++---
tests/runners/airflow/dag/test_rainbow_dags.py | 9 +++++++++
tests/runners/airflow/rainbow/rainbow.yml | 7 ++++++-
3 files changed, 23 insertions(+), 4 deletions(-)
diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py
index 6b071fd..d5e3be1 100644
--- a/rainbow/runners/airflow/dag/rainbow_dags.py
+++ b/rainbow/runners/airflow/dag/rainbow_dags.py
@@ -28,6 +28,8 @@ from rainbow.runners.airflow.model.task import Task
from rainbow.runners.airflow.tasks.defaults.job_end import JobEndTask
from rainbow.runners.airflow.tasks.defaults.job_start import JobStartTask
+__DEPENDS_ON_PAST = 'depends_on_past'
+
def register_dags(configs_path):
"""
@@ -47,12 +49,15 @@ def register_dags(configs_path):
for pipeline in config['pipelines']:
pipeline_name = pipeline['pipeline']
- default_args = {
- 'owner': config['owner'],
+ default_args = {k: v for k, v in pipeline.items()}
+
+ override_args = {
'start_date': datetime.combine(pipeline['start_date'], datetime.min.time()),
- 'depends_on_past': False,
+ __DEPENDS_ON_PAST: default_args[__DEPENDS_ON_PAST] if __DEPENDS_ON_PAST in default_args else False,
}
+ default_args.update(override_args)
+
dag = DAG(
dag_id=pipeline_name,
default_args=default_args,
diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py
index c744ce5..5ffdf07 100644
--- a/tests/runners/airflow/dag/test_rainbow_dags.py
+++ b/tests/runners/airflow/dag/test_rainbow_dags.py
@@ -31,6 +31,15 @@ class Test(TestCase):
self.assertIsInstance(task_dict['end'], JobEndOperator)
+ def test_default_args(self):
+ dag = self.get_register_dags()[0]
+ default_args = dag.default_args
+
+ keys = default_args.keys()
+ self.assertIn('default_arg_loaded', keys)
+ self.assertIn('default_array_loaded', keys)
+ self.assertIn('default_object_loaded', keys)
+
@staticmethod
def get_register_dags():
base_path = os.path.join(os.path.dirname(__file__), '../rainbow')
diff --git a/tests/runners/airflow/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml
index 05c0a09..0b08a1f 100644
--- a/tests/runners/airflow/rainbow/rainbow.yml
+++ b/tests/runners/airflow/rainbow/rainbow.yml
@@ -17,12 +17,17 @@
# under the License.
---
name: MyPipeline
-owner: Bosco Albert Baracus
pipelines:
- pipeline: my_pipeline
+ owner: Bosco Albert Baracus
start_date: 1970-01-01
timeout_minutes: 45
schedule: 0 * 1 * *
+ default_arg_loaded: check
+ default_array_loaded: [2, 3, 4]
+ default_object_loaded:
+ key1: val1
+ key2: val2
metrics:
namespace: TestNamespace
backends: [ 'cloudwatch' ]