You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:24:43 UTC
[incubator-liminal] 03/43: rainbow_dags dag creation + python task
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit aa63e16674d6bfa904a2b6764dd2257d2f3827de
Author: aviemzur <av...@gmail.com>
AuthorDate: Tue Mar 10 11:57:25 2020 +0200
rainbow_dags dag creation + python task
---
rainbow/cli/__init__.py | 1 +
rainbow/core/__init__.py | 1 +
rainbow/docker/__init__.py | 1 +
rainbow/http/__init__.py | 1 +
rainbow/monitoring/__init__.py | 1 +
.../runners/airflow/compiler/rainbow_compiler.py | 8 +-
rainbow/runners/airflow/dag/rainbow_dags.py | 90 ++++++++++
.../airflow/model}/__init__.py | 0
.../rainbow_compiler.py => model/task.py} | 22 ++-
.../airflow/tasks}/__init__.py | 0
rainbow/runners/airflow/tasks/python.py | 190 +++++++++++++++++++++
tests/runners/airflow/compiler/rainbow.yml | 115 -------------
.../airflow/compiler/test_rainbow_compiler.py | 33 ----
.../runners/airflow/dag}/__init__.py | 0
tests/runners/airflow/dag/rainbow/rainbow.yml | 51 ++++++
tests/runners/airflow/dag/test_rainbow_dags.py | 11 ++
.../runners/airflow/tasks}/__init__.py | 0
tests/runners/airflow/tasks/test_python.py | 50 ++++++
{rainbow/monitoring => tests/util}/__init__.py | 0
.../util/dag_test_utils.py | 21 ++-
20 files changed, 429 insertions(+), 167 deletions(-)
diff --git a/rainbow/cli/__init__.py b/rainbow/cli/__init__.py
index 217e5db..c24b2fa 100644
--- a/rainbow/cli/__init__.py
+++ b/rainbow/cli/__init__.py
@@ -15,3 +15,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+# TODO: cli
diff --git a/rainbow/core/__init__.py b/rainbow/core/__init__.py
index 217e5db..2162b08 100644
--- a/rainbow/core/__init__.py
+++ b/rainbow/core/__init__.py
@@ -15,3 +15,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+# TODO: core
diff --git a/rainbow/docker/__init__.py b/rainbow/docker/__init__.py
index 217e5db..8bb1ec2 100644
--- a/rainbow/docker/__init__.py
+++ b/rainbow/docker/__init__.py
@@ -15,3 +15,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+# TODO: docker
diff --git a/rainbow/http/__init__.py b/rainbow/http/__init__.py
index 217e5db..d723ae2 100644
--- a/rainbow/http/__init__.py
+++ b/rainbow/http/__init__.py
@@ -15,3 +15,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+# TODO: http
diff --git a/rainbow/monitoring/__init__.py b/rainbow/monitoring/__init__.py
index 217e5db..8df8694 100644
--- a/rainbow/monitoring/__init__.py
+++ b/rainbow/monitoring/__init__.py
@@ -15,3 +15,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+# TODO: monitoring
diff --git a/rainbow/runners/airflow/compiler/rainbow_compiler.py b/rainbow/runners/airflow/compiler/rainbow_compiler.py
index 818fdc5..bed1efd 100644
--- a/rainbow/runners/airflow/compiler/rainbow_compiler.py
+++ b/rainbow/runners/airflow/compiler/rainbow_compiler.py
@@ -16,11 +16,5 @@
# specific language governing permissions and limitations
# under the License.
"""
-Compiler for rainbows.
+TODO: compiler for rainbows.
"""
-import yaml
-
-
-def parse_yaml(path):
- with open(path, 'r') as stream:
- return yaml.safe_load(stream)
diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py
new file mode 100644
index 0000000..577da07
--- /dev/null
+++ b/rainbow/runners/airflow/dag/rainbow_dags.py
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# TODO: Iterate over each pipeline and create a DAG for it. \
+# Within every pipeline iterate over tasks and apply them to DAG.
+
+import os
+import pprint
+
+import yaml
+from airflow import DAG
+
+from rainbow.runners.airflow.tasks.python import PythonTask
+from datetime import datetime
+
+
+def register_dags(path):
+ files = []
+ for r, d, f in os.walk(path):
+ for file in f:
+ if file[file.rfind('.') + 1:] in ['yml', 'yaml']:
+ files.append(os.path.join(r, file))
+
+ print(files)
+
+ dags = []
+
+ for config_file in files:
+ print(f'Registering DAG for file: f{config_file}')
+
+ with open(config_file) as stream:
+ # TODO: validate config
+ config = yaml.safe_load(stream)
+ pp = pprint.PrettyPrinter(indent=4)
+ # pp.pprint(config)
+
+ for pipeline in config['pipelines']:
+ parent = None
+
+ default_args = {
+ 'owner': config['owner'],
+ 'start_date': datetime.combine(pipeline['start_date'], datetime.min.time())
+ }
+ # TODO: add all relevant airflow args
+ dag = DAG(
+ dag_id='test_dag',
+ default_args=default_args
+ )
+
+ for task in pipeline['tasks']:
+ task_type = task['type']
+ task_instance = get_task_class(task_type)(
+ dag, pipeline['pipeline'], parent if parent else None, task, 'all_success'
+ )
+ parent = task_instance.apply_task_to_dag()
+
+ print(f'{parent}{{{task_type}}}')
+
+ dags.append(dag)
+ return dags
+
+
+# TODO: task class registry
+task_classes = {
+ 'python': PythonTask
+}
+
+
+def get_task_class(task_type):
+ return task_classes[task_type]
+
+
+if __name__ == '__main__':
+ # TODO: configurable yaml dir
+ path = 'tests/runners/airflow/dag/rainbow'
+ register_dags(path)
diff --git a/rainbow/monitoring/__init__.py b/rainbow/runners/airflow/model/__init__.py
similarity index 100%
copy from rainbow/monitoring/__init__.py
copy to rainbow/runners/airflow/model/__init__.py
diff --git a/rainbow/runners/airflow/compiler/rainbow_compiler.py b/rainbow/runners/airflow/model/task.py
similarity index 72%
copy from rainbow/runners/airflow/compiler/rainbow_compiler.py
copy to rainbow/runners/airflow/model/task.py
index 818fdc5..e74085d 100644
--- a/rainbow/runners/airflow/compiler/rainbow_compiler.py
+++ b/rainbow/runners/airflow/model/task.py
@@ -16,11 +16,23 @@
# specific language governing permissions and limitations
# under the License.
"""
-Compiler for rainbows.
+Base task.
"""
-import yaml
-def parse_yaml(path):
- with open(path, 'r') as stream:
- return yaml.safe_load(stream)
+class Task:
+ """
+ Task.
+ """
+
+ def setup(self):
+ """
+ Setup method for task.
+ """
+ raise NotImplementedError()
+
+ def apply_task_to_dag(self):
+ """
+ Registers Airflow operator to parent task.
+ """
+ raise NotImplementedError()
diff --git a/rainbow/monitoring/__init__.py b/rainbow/runners/airflow/tasks/__init__.py
similarity index 100%
copy from rainbow/monitoring/__init__.py
copy to rainbow/runners/airflow/tasks/__init__.py
diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py
new file mode 100644
index 0000000..727e11c
--- /dev/null
+++ b/rainbow/runners/airflow/tasks/python.py
@@ -0,0 +1,190 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+
+from airflow.models import Variable
+from airflow.operators.dummy_operator import DummyOperator
+
+from rainbow.runners.airflow.model import task
+from rainbow.runners.airflow.operators.kubernetes_pod_operator import \
+ ConfigurableKubernetesPodOperator, \
+ ConfigureParallelExecutionOperator
+
+
+class PythonTask(task.Task):
+ """
+ Python task.
+ """
+
+ def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
+ self.dag = dag
+ self.parent = parent
+ self.config = config
+ self.trigger_rule = trigger_rule
+ self.input_type = config['input_type']
+ self.input_path = config['input_path']
+ self.task_name = config['task']
+ self.image = self.config['image']
+ self.resources = self.__resources_config(config)
+ self.env_vars = self.__env_vars(pipeline_name, config)
+ self.kubernetes_kwargs = self.__kubernetes_kwargs(
+ dag, self.env_vars, self.resources, self.task_name
+ )
+ self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments(config)
+ self.config_task_id = self.task_name + '_input'
+ self.executors = self.__executors(config)
+
+ def setup(self):
+ # TODO: build docker image if needed.
+ pass
+
+ def apply_task_to_dag(self):
+
+ def create_pod_operator(task_id, task_split, image):
+ return ConfigurableKubernetesPodOperator(
+ task_id=task_id,
+ config_task_id=self.config_task_id,
+ task_split=task_split,
+ image=image,
+ cmds=self.cmds,
+ arguments=self.arguments,
+ **self.kubernetes_kwargs
+ )
+
+ config_task = None
+
+ if self.input_type in ['static', 'task']:
+ self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path})
+
+ config_task = ConfigureParallelExecutionOperator(
+ task_id=self.config_task_id,
+ image=self.image,
+ config_type=self.input_type,
+ config_path=self.input_path,
+ executors=self.executors,
+ **self.kubernetes_kwargs
+ )
+
+ if self.executors == 1:
+ pod_task = create_pod_operator(
+ task_id=f'{self.task_name}',
+ task_split=0,
+ image=f'''{self.image}'''
+ )
+
+ first_task = pod_task
+
+ if config_task:
+ first_task = config_task
+ first_task.set_downstream(pod_task)
+
+ if self.parent:
+ self.parent.set_downstream(first_task)
+
+ return pod_task
+ else:
+ if not config_task:
+ config_task = DummyOperator(
+ task_id=self.config_task_id,
+ trigger_rule=self.trigger_rule,
+ dag=self.dag
+ )
+
+ end_task = DummyOperator(
+ task_id=self.task_name,
+ dag=self.dag
+ )
+
+ if self.parent:
+ self.parent.set_downstream(config_task)
+
+ for i in range(self.executors):
+ split_task = create_pod_operator(
+ task_id=f'''{self.task_name}_{i}''',
+ task_split=i,
+ image=self.image
+ )
+
+ config_task.set_downstream(split_task)
+
+ split_task.set_downstream(end_task)
+
+ return end_task
+
+ @staticmethod
+ def __executors(config):
+ executors = 1
+ if 'executors' in config:
+ executors = config['executors']
+ return executors
+
+ @staticmethod
+ def __kubernetes_cmds_and_arguments(config):
+ cmds = ['/bin/bash', '-c']
+ arguments = [
+ f'''sh container-setup.sh && \
+ {config['cmd']} && \
+ sh container-teardown.sh {config['output_path']}'''
+ ]
+ return cmds, arguments
+
+ @staticmethod
+ def __kubernetes_kwargs(dag, env_vars, resources, task_name):
+ kubernetes_kwargs = {
+ 'namespace': Variable.get('kubernetes_namespace', default_var='default'),
+ 'name': task_name.replace('_', '-'),
+ 'in_cluster': Variable.get('in_kubernetes_cluster', default_var=False),
+ 'image_pull_policy': Variable.get('image_pull_policy', default_var='IfNotPresent'),
+ 'get_logs': True,
+ 'env_vars': env_vars,
+ 'do_xcom_push': True,
+ 'is_delete_operator_pod': True,
+ 'startup_timeout_seconds': 300,
+ 'image_pull_secrets': 'regcred',
+ 'resources': resources,
+ 'dag': dag
+ }
+ return kubernetes_kwargs
+
+ @staticmethod
+ def __env_vars(pipeline_name, config):
+ env_vars = {}
+ if 'env_vars' in config:
+ env_vars = config['env_vars']
+ airflow_configuration_variable = Variable.get(
+ f'''{pipeline_name}_dag_configuration''',
+ default_var=None)
+ if airflow_configuration_variable:
+ airflow_configs = json.loads(airflow_configuration_variable)
+ environment_variables_key = f'''{self.pipeline}_environment_variables'''
+ if environment_variables_key in airflow_configs:
+ env_vars = airflow_configs[environment_variables_key]
+ return env_vars
+
+ @staticmethod
+ def __resources_config(config):
+ resources = {}
+ if 'request_cpu' in config:
+ resources['request_cpu'] = config['request_cpu']
+ if 'request_memory' in config:
+ resources['request_memory'] = config['request_memory']
+ if 'limit_cpu' in config:
+ resources['limit_cpu'] = config['limit_cpu']
+ if 'limit_memory' in config:
+ resources['limit_memory'] = config['limit_memory']
+ return resources
diff --git a/tests/runners/airflow/compiler/rainbow.yml b/tests/runners/airflow/compiler/rainbow.yml
deleted file mode 100644
index 45333a8..0000000
--- a/tests/runners/airflow/compiler/rainbow.yml
+++ /dev/null
@@ -1,115 +0,0 @@
-
----
-name: MyPipeline
-owner: Bosco Albert Baracus
-pipeline:
- timeout-minutes: 45
- schedule: 0 * 1 * *
- metrics-namespace: TestNamespace
- tasks:
- - name: mytask1
- type: sql
- description: mytask1 is cool
- query: "select * from mytable"
- overrides:
- - prod:
- partition-columns: dt
- output-table: test.test_impression_prod
- output-path: s3://mybucket/myproject-test/impression
- emr-cluster-name: spark-playground-prod
- - stg:
- query: "select * from mytable"
- partition-columns: dt
- output-table: test.test_impression_stg
- output-path: s3://mybucket/haya-test/impression
- emr-cluster-name: spark-playground-staging
- tasks:
- - name: my_static_config_task
- type: python
- description: my 1st ds task
- artifact-id: mytask1artifactid
- source: mytask1folder
- env-vars:
- env1: "a"
- env2: "b"
- config-type: static
- config-path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}"
- cmd: python -u my_app.py
- - task:
- name: my_no_config_task
- type: python
- description: my 2nd ds task
- artifact-id: mytask1artifactid
- env-vars:
- env1: "a"
- env2: "b"
- request-cpu: 100m
- request-memory: 65M
- cmd: python -u my_app.py foo bar
- - task:
- name: my_create_custom_config_task
- type: python
- description: my 2nd ds task
- artifact-id: myconftask
- source: myconftask
- output-config-path: /my_conf.json
- env-vars:
- env1: "a"
- env2: "b"
- cmd: python -u my_app.py foo bar
- - task:
- name: my_custom_config_task
- type: python
- description: my 2nd ds task
- artifact-id: mytask1artifactid
- config-type: task
- config-path: my_create_custom_config_task
- env-vars:
- env1: "a"
- env2: "b"
- cmd: python -u my_app.py foo bar
- - task:
- name: my_parallelized_static_config_task
- type: python
- description: my 3rd ds task
- artifact-id: mytask1artifactid
- executors: 5
- env-vars:
- env1: "x"
- env2: "y"
- myconf: $CONFIG_FILE
- config-type: static
- config-path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 }, { \"campaign_id\": 30 }, { \"campaign_id\": 40 }, { \"campaign_id\": 50 }, { \"campaign_id\": 60 }, { \"campaign_id\": 70 }, { \"campaign_id\": 80 } ]}"
- cmd: python -u my_app.py $CONFIG_FILE
- - task:
- name: my_parallelized_custom_config_task
- type: python
- description: my 4th ds task
- artifact-id: mytask1artifactid
- executors: 5
- config-type: task
- config-path: my_create_custom_config_task
- cmd: python -u my_app.py
- - task:
- name: my_parallelized_no_config_task
- type: python
- description: my 4th ds task
- artifact-id: mytask1artifactid
- executors: 5
- cmd: python -u my_app.py
-services:
- - service:
- name: myserver1
- type: python-server
- description: my python server
- artifact-id: myserver1artifactid
- source: myserver1logicfolder
- endpoints:
- - endpoint:
- path: /myendpoint1
- module: mymodule1
- function: myfun1
- - endpoint:
- path: /myendpoint2
- module: mymodule2
- function: myfun2
diff --git a/tests/runners/airflow/compiler/test_rainbow_compiler.py b/tests/runners/airflow/compiler/test_rainbow_compiler.py
deleted file mode 100644
index 6e73d8f..0000000
--- a/tests/runners/airflow/compiler/test_rainbow_compiler.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-
-from rainbow.runners.airflow.compiler import rainbow_compiler
-
-
-class TestRainbowCompiler(unittest.TestCase):
-
- def test_parse(self):
- expected = {'name': 'MyPipeline', 'owner': 'Bosco Albert Baracus', 'pipeline': {'timeout-minutes': 45, 'schedule': '0 * 1 * *', 'metrics-namespace': 'TestNamespace', 'tasks': [{'name': 'mytask1', 'type': 'sql', 'description': 'mytask1 is cool', 'query': 'select * from mytable', 'overrides': [{'prod': None, 'partition-columns': 'dt', 'output-table': 'test.test_impression_prod', 'output-path': 's3://mybucket/myproject-test/impression', 'emr-cluster-name': 'spark-playground-prod'}, [...]
- actual = rainbow_compiler.parse_yaml('tests/runners/airflow/compiler/rainbow.yml')
- self.assertEqual(expected, actual)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/rainbow/monitoring/__init__.py b/tests/runners/airflow/dag/__init__.py
similarity index 100%
copy from rainbow/monitoring/__init__.py
copy to tests/runners/airflow/dag/__init__.py
diff --git a/tests/runners/airflow/dag/rainbow/rainbow.yml b/tests/runners/airflow/dag/rainbow/rainbow.yml
new file mode 100644
index 0000000..07afd08
--- /dev/null
+++ b/tests/runners/airflow/dag/rainbow/rainbow.yml
@@ -0,0 +1,51 @@
+
+---
+name: MyPipeline
+owner: Bosco Albert Baracus
+pipelines:
+ - pipeline: my_pipeline
+ start_date: 1970-01-01
+ timeout-minutes: 45
+ schedule: 0 * 1 * *
+ metrics-namespace: TestNamespace
+ tasks:
+ - task: my_static_config_task
+ type: python
+ description: my 1st ds task
+ image: mytask1artifactid
+ source: mytask1folder
+ env_vars:
+ env1: "a"
+ env2: "b"
+ input_type: static
+ input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}"
+ output_path: 'baz'
+ cmd: 'foo bar'
+ - task: my_static_config_task2
+ type: python
+ description: my 1st ds task
+ image: mytask1artifactid
+ source: mytask1folder
+ env_vars:
+ env1: "a"
+ env2: "b"
+ input_type: static
+ input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}"
+ output_path: 'baz'
+ cmd: 'foo bar'
+services:
+ - service:
+ name: myserver1
+ type: python-server
+ description: my python server
+ artifact-id: myserver1artifactid
+ source: myserver1logicfolder
+ endpoints:
+ - endpoint:
+ path: /myendpoint1
+ module: mymodule1
+ function: myfun1
+ - endpoint:
+ path: /myendpoint2
+ module: mymodule2
+ function: myfun2
diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py
new file mode 100644
index 0000000..41bea09
--- /dev/null
+++ b/tests/runners/airflow/dag/test_rainbow_dags.py
@@ -0,0 +1,11 @@
+from unittest import TestCase
+
+from rainbow.runners.airflow.dag import rainbow_dags
+
+
+class Test(TestCase):
+ def test_register_dags(self):
+ dags = rainbow_dags.register_dags("tests/runners/airflow/dag/rainbow")
+ self.assertEqual(len(dags), 1)
+ # TODO: elaborate test
+ pass
diff --git a/rainbow/monitoring/__init__.py b/tests/runners/airflow/tasks/__init__.py
similarity index 100%
copy from rainbow/monitoring/__init__.py
copy to tests/runners/airflow/tasks/__init__.py
diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py
new file mode 100644
index 0000000..4f5808b
--- /dev/null
+++ b/tests/runners/airflow/tasks/test_python.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+
+from rainbow.runners.airflow.operators.kubernetes_pod_operator import \
+ ConfigurableKubernetesPodOperator
+from rainbow.runners.airflow.tasks import python
+from tests.util import dag_test_utils
+
+
+class TestPythonTask(TestCase):
+ def test_apply_task_to_dag(self):
+ # TODO: elaborate tests
+ dag = dag_test_utils.create_dag()
+
+ task_id = 'my_task'
+
+ config = {
+ 'task': task_id,
+ 'cmd': 'foo bar',
+ 'image': 'my_image',
+ 'input_type': 'my_input_type',
+ 'input_path': 'my_input',
+ 'output_path': '/my_output.json'
+ }
+
+ task0 = python.PythonTask(dag, 'my_pipeline', None, config, 'all_success')
+ task0.apply_task_to_dag()
+
+ self.assertEqual(len(dag.tasks), 1)
+ dag_task0 = dag.tasks[0]
+
+ self.assertIsInstance(dag_task0, ConfigurableKubernetesPodOperator)
+ self.assertEqual(dag_task0.task_id, task_id)
diff --git a/rainbow/monitoring/__init__.py b/tests/util/__init__.py
similarity index 100%
copy from rainbow/monitoring/__init__.py
copy to tests/util/__init__.py
diff --git a/rainbow/runners/airflow/compiler/rainbow_compiler.py b/tests/util/dag_test_utils.py
similarity index 76%
copy from rainbow/runners/airflow/compiler/rainbow_compiler.py
copy to tests/util/dag_test_utils.py
index 818fdc5..b1fbcab 100644
--- a/rainbow/runners/airflow/compiler/rainbow_compiler.py
+++ b/tests/util/dag_test_utils.py
@@ -15,12 +15,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-Compiler for rainbows.
-"""
-import yaml
-def parse_yaml(path):
- with open(path, 'r') as stream:
- return yaml.safe_load(stream)
+from datetime import datetime
+
+from airflow import DAG
+
+
+def create_dag():
+ """
+ Test util to create a basic DAG for testing.
+ """
+
+ return DAG(
+ dag_id='test_dag',
+ default_args={'start_date': datetime(1970, 1, 1)}
+ )