You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:24:43 UTC

[incubator-liminal] 03/43: rainbow_dags dag creation + python task

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git

commit aa63e16674d6bfa904a2b6764dd2257d2f3827de
Author: aviemzur <av...@gmail.com>
AuthorDate: Tue Mar 10 11:57:25 2020 +0200

    rainbow_dags dag creation + python task
---
 rainbow/cli/__init__.py                            |   1 +
 rainbow/core/__init__.py                           |   1 +
 rainbow/docker/__init__.py                         |   1 +
 rainbow/http/__init__.py                           |   1 +
 rainbow/monitoring/__init__.py                     |   1 +
 .../runners/airflow/compiler/rainbow_compiler.py   |   8 +-
 rainbow/runners/airflow/dag/rainbow_dags.py        |  90 ++++++++++
 .../airflow/model}/__init__.py                     |   0
 .../rainbow_compiler.py => model/task.py}          |  22 ++-
 .../airflow/tasks}/__init__.py                     |   0
 rainbow/runners/airflow/tasks/python.py            | 190 +++++++++++++++++++++
 tests/runners/airflow/compiler/rainbow.yml         | 115 -------------
 .../airflow/compiler/test_rainbow_compiler.py      |  33 ----
 .../runners/airflow/dag}/__init__.py               |   0
 tests/runners/airflow/dag/rainbow/rainbow.yml      |  51 ++++++
 tests/runners/airflow/dag/test_rainbow_dags.py     |  11 ++
 .../runners/airflow/tasks}/__init__.py             |   0
 tests/runners/airflow/tasks/test_python.py         |  50 ++++++
 {rainbow/monitoring => tests/util}/__init__.py     |   0
 .../util/dag_test_utils.py                         |  21 ++-
 20 files changed, 429 insertions(+), 167 deletions(-)

diff --git a/rainbow/cli/__init__.py b/rainbow/cli/__init__.py
index 217e5db..c24b2fa 100644
--- a/rainbow/cli/__init__.py
+++ b/rainbow/cli/__init__.py
@@ -15,3 +15,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# TODO: cli
diff --git a/rainbow/core/__init__.py b/rainbow/core/__init__.py
index 217e5db..2162b08 100644
--- a/rainbow/core/__init__.py
+++ b/rainbow/core/__init__.py
@@ -15,3 +15,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# TODO: core
diff --git a/rainbow/docker/__init__.py b/rainbow/docker/__init__.py
index 217e5db..8bb1ec2 100644
--- a/rainbow/docker/__init__.py
+++ b/rainbow/docker/__init__.py
@@ -15,3 +15,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# TODO: docker
diff --git a/rainbow/http/__init__.py b/rainbow/http/__init__.py
index 217e5db..d723ae2 100644
--- a/rainbow/http/__init__.py
+++ b/rainbow/http/__init__.py
@@ -15,3 +15,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# TODO: http
diff --git a/rainbow/monitoring/__init__.py b/rainbow/monitoring/__init__.py
index 217e5db..8df8694 100644
--- a/rainbow/monitoring/__init__.py
+++ b/rainbow/monitoring/__init__.py
@@ -15,3 +15,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# TODO: monitoring
diff --git a/rainbow/runners/airflow/compiler/rainbow_compiler.py b/rainbow/runners/airflow/compiler/rainbow_compiler.py
index 818fdc5..bed1efd 100644
--- a/rainbow/runners/airflow/compiler/rainbow_compiler.py
+++ b/rainbow/runners/airflow/compiler/rainbow_compiler.py
@@ -16,11 +16,5 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Compiler for rainbows.
+TODO: compiler for rainbows.
 """
-import yaml
-
-
-def parse_yaml(path):
-    with open(path, 'r') as stream:
-        return yaml.safe_load(stream)
diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py
new file mode 100644
index 0000000..577da07
--- /dev/null
+++ b/rainbow/runners/airflow/dag/rainbow_dags.py
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# TODO: Iterate over each pipeline and create a DAG for it. \
+#  Within every pipeline iterate over tasks and apply them to DAG.
+
+import os
+import pprint
+
+import yaml
+from airflow import DAG
+
+from rainbow.runners.airflow.tasks.python import PythonTask
+from datetime import datetime
+
+
+def register_dags(path):
+    files = []
+    for r, d, f in os.walk(path):
+        for file in f:
+            if file[file.rfind('.') + 1:] in ['yml', 'yaml']:
+                files.append(os.path.join(r, file))
+
+    print(files)
+
+    dags = []
+
+    for config_file in files:
+        print(f'Registering DAG for file: f{config_file}')
+
+        with open(config_file) as stream:
+            # TODO: validate config
+            config = yaml.safe_load(stream)
+            pp = pprint.PrettyPrinter(indent=4)
+            # pp.pprint(config)
+
+            for pipeline in config['pipelines']:
+                parent = None
+
+                default_args = {
+                    'owner': config['owner'],
+                    'start_date': datetime.combine(pipeline['start_date'], datetime.min.time())
+                }
+                # TODO: add all relevant airflow args
+                dag = DAG(
+                    dag_id='test_dag',
+                    default_args=default_args
+                )
+
+                for task in pipeline['tasks']:
+                    task_type = task['type']
+                    task_instance = get_task_class(task_type)(
+                        dag, pipeline['pipeline'], parent if parent else None, task, 'all_success'
+                    )
+                    parent = task_instance.apply_task_to_dag()
+
+                    print(f'{parent}{{{task_type}}}')
+
+                dags.append(dag)
+    return dags
+
+
+# TODO: task class registry
+task_classes = {
+    'python': PythonTask
+}
+
+
+def get_task_class(task_type):
+    return task_classes[task_type]
+
+
+if __name__ == '__main__':
+    # TODO: configurable yaml dir
+    path = 'tests/runners/airflow/dag/rainbow'
+    register_dags(path)
diff --git a/rainbow/monitoring/__init__.py b/rainbow/runners/airflow/model/__init__.py
similarity index 100%
copy from rainbow/monitoring/__init__.py
copy to rainbow/runners/airflow/model/__init__.py
diff --git a/rainbow/runners/airflow/compiler/rainbow_compiler.py b/rainbow/runners/airflow/model/task.py
similarity index 72%
copy from rainbow/runners/airflow/compiler/rainbow_compiler.py
copy to rainbow/runners/airflow/model/task.py
index 818fdc5..e74085d 100644
--- a/rainbow/runners/airflow/compiler/rainbow_compiler.py
+++ b/rainbow/runners/airflow/model/task.py
@@ -16,11 +16,23 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Compiler for rainbows.
+Base task.
 """
-import yaml
 
 
-def parse_yaml(path):
-    with open(path, 'r') as stream:
-        return yaml.safe_load(stream)
+class Task:
+    """
+    Task.
+    """
+
+    def setup(self):
+        """
+        Setup method for task.
+        """
+        raise NotImplementedError()
+
+    def apply_task_to_dag(self):
+        """
+        Registers Airflow operator to parent task.
+        """
+        raise NotImplementedError()
diff --git a/rainbow/monitoring/__init__.py b/rainbow/runners/airflow/tasks/__init__.py
similarity index 100%
copy from rainbow/monitoring/__init__.py
copy to rainbow/runners/airflow/tasks/__init__.py
diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py
new file mode 100644
index 0000000..727e11c
--- /dev/null
+++ b/rainbow/runners/airflow/tasks/python.py
@@ -0,0 +1,190 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+
+from airflow.models import Variable
+from airflow.operators.dummy_operator import DummyOperator
+
+from rainbow.runners.airflow.model import task
+from rainbow.runners.airflow.operators.kubernetes_pod_operator import \
+    ConfigurableKubernetesPodOperator, \
+    ConfigureParallelExecutionOperator
+
+
+class PythonTask(task.Task):
+    """
+    Python task.
+    """
+
+    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
+        self.dag = dag
+        self.parent = parent
+        self.config = config
+        self.trigger_rule = trigger_rule
+        self.input_type = config['input_type']
+        self.input_path = config['input_path']
+        self.task_name = config['task']
+        self.image = self.config['image']
+        self.resources = self.__resources_config(config)
+        self.env_vars = self.__env_vars(pipeline_name, config)
+        self.kubernetes_kwargs = self.__kubernetes_kwargs(
+            dag, self.env_vars, self.resources, self.task_name
+        )
+        self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments(config)
+        self.config_task_id = self.task_name + '_input'
+        self.executors = self.__executors(config)
+
+    def setup(self):
+        # TODO: build docker image if needed.
+        pass
+
+    def apply_task_to_dag(self):
+
+        def create_pod_operator(task_id, task_split, image):
+            return ConfigurableKubernetesPodOperator(
+                task_id=task_id,
+                config_task_id=self.config_task_id,
+                task_split=task_split,
+                image=image,
+                cmds=self.cmds,
+                arguments=self.arguments,
+                **self.kubernetes_kwargs
+            )
+
+        config_task = None
+
+        if self.input_type in ['static', 'task']:
+            self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path})
+
+            config_task = ConfigureParallelExecutionOperator(
+                task_id=self.config_task_id,
+                image=self.image,
+                config_type=self.input_type,
+                config_path=self.input_path,
+                executors=self.executors,
+                **self.kubernetes_kwargs
+            )
+
+        if self.executors == 1:
+            pod_task = create_pod_operator(
+                task_id=f'{self.task_name}',
+                task_split=0,
+                image=f'''{self.image}'''
+            )
+
+            first_task = pod_task
+
+            if config_task:
+                first_task = config_task
+                first_task.set_downstream(pod_task)
+
+            if self.parent:
+                self.parent.set_downstream(first_task)
+
+            return pod_task
+        else:
+            if not config_task:
+                config_task = DummyOperator(
+                    task_id=self.config_task_id,
+                    trigger_rule=self.trigger_rule,
+                    dag=self.dag
+                )
+
+            end_task = DummyOperator(
+                task_id=self.task_name,
+                dag=self.dag
+            )
+
+            if self.parent:
+                self.parent.set_downstream(config_task)
+
+                for i in range(self.executors):
+                    split_task = create_pod_operator(
+                        task_id=f'''{self.task_name}_{i}''',
+                        task_split=i,
+                        image=self.image
+                    )
+
+                    config_task.set_downstream(split_task)
+
+                    split_task.set_downstream(end_task)
+
+            return end_task
+
+    @staticmethod
+    def __executors(config):
+        executors = 1
+        if 'executors' in config:
+            executors = config['executors']
+        return executors
+
+    @staticmethod
+    def __kubernetes_cmds_and_arguments(config):
+        cmds = ['/bin/bash', '-c']
+        arguments = [
+            f'''sh container-setup.sh && \
+            {config['cmd']} && \
+            sh container-teardown.sh {config['output_path']}'''
+        ]
+        return cmds, arguments
+
+    @staticmethod
+    def __kubernetes_kwargs(dag, env_vars, resources, task_name):
+        kubernetes_kwargs = {
+            'namespace': Variable.get('kubernetes_namespace', default_var='default'),
+            'name': task_name.replace('_', '-'),
+            'in_cluster': Variable.get('in_kubernetes_cluster', default_var=False),
+            'image_pull_policy': Variable.get('image_pull_policy', default_var='IfNotPresent'),
+            'get_logs': True,
+            'env_vars': env_vars,
+            'do_xcom_push': True,
+            'is_delete_operator_pod': True,
+            'startup_timeout_seconds': 300,
+            'image_pull_secrets': 'regcred',
+            'resources': resources,
+            'dag': dag
+        }
+        return kubernetes_kwargs
+
+    @staticmethod
+    def __env_vars(pipeline_name, config):
+        env_vars = {}
+        if 'env_vars' in config:
+            env_vars = config['env_vars']
+        airflow_configuration_variable = Variable.get(
+            f'''{pipeline_name}_dag_configuration''',
+            default_var=None)
+        if airflow_configuration_variable:
+            airflow_configs = json.loads(airflow_configuration_variable)
+            environment_variables_key = f'''{self.pipeline}_environment_variables'''
+            if environment_variables_key in airflow_configs:
+                env_vars = airflow_configs[environment_variables_key]
+        return env_vars
+
+    @staticmethod
+    def __resources_config(config):
+        resources = {}
+        if 'request_cpu' in config:
+            resources['request_cpu'] = config['request_cpu']
+        if 'request_memory' in config:
+            resources['request_memory'] = config['request_memory']
+        if 'limit_cpu' in config:
+            resources['limit_cpu'] = config['limit_cpu']
+        if 'limit_memory' in config:
+            resources['limit_memory'] = config['limit_memory']
+        return resources
diff --git a/tests/runners/airflow/compiler/rainbow.yml b/tests/runners/airflow/compiler/rainbow.yml
deleted file mode 100644
index 45333a8..0000000
--- a/tests/runners/airflow/compiler/rainbow.yml
+++ /dev/null
@@ -1,115 +0,0 @@
-
----
-name: MyPipeline
-owner: Bosco Albert Baracus
-pipeline:
-  timeout-minutes: 45
-  schedule: 0 * 1 * *
-  metrics-namespace: TestNamespace
-  tasks:
-    - name: mytask1
-      type: sql
-      description: mytask1 is cool
-      query: "select * from mytable"
-      overrides:
-        - prod:
-          partition-columns: dt
-          output-table: test.test_impression_prod
-          output-path: s3://mybucket/myproject-test/impression
-          emr-cluster-name: spark-playground-prod
-        - stg:
-          query: "select * from mytable"
-          partition-columns: dt
-          output-table: test.test_impression_stg
-          output-path: s3://mybucket/haya-test/impression
-          emr-cluster-name: spark-playground-staging
-      tasks:
-        - name: my_static_config_task
-          type: python
-          description: my 1st ds task
-          artifact-id: mytask1artifactid
-          source: mytask1folder
-          env-vars:
-            env1: "a"
-            env2: "b"
-          config-type: static
-          config-path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}"
-          cmd: python -u my_app.py
-        - task:
-          name: my_no_config_task
-          type: python
-          description: my 2nd ds task
-          artifact-id: mytask1artifactid
-          env-vars:
-            env1: "a"
-            env2: "b"
-          request-cpu: 100m
-          request-memory: 65M
-          cmd: python -u my_app.py foo bar
-        - task:
-          name: my_create_custom_config_task
-          type: python
-          description: my 2nd ds task
-          artifact-id: myconftask
-          source: myconftask
-          output-config-path: /my_conf.json
-          env-vars:
-            env1: "a"
-            env2: "b"
-          cmd: python -u my_app.py foo bar
-        - task:
-          name: my_custom_config_task
-          type: python
-          description: my 2nd ds task
-          artifact-id: mytask1artifactid
-          config-type: task
-          config-path: my_create_custom_config_task
-          env-vars:
-            env1: "a"
-            env2: "b"
-          cmd: python -u my_app.py foo bar
-        - task:
-          name: my_parallelized_static_config_task
-          type: python
-          description: my 3rd ds task
-          artifact-id: mytask1artifactid
-          executors: 5
-          env-vars:
-            env1: "x"
-            env2: "y"
-            myconf: $CONFIG_FILE
-          config-type: static
-          config-path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 }, { \"campaign_id\": 30 }, { \"campaign_id\": 40 }, { \"campaign_id\": 50 }, { \"campaign_id\": 60 }, { \"campaign_id\": 70 }, { \"campaign_id\": 80 } ]}"
-          cmd: python -u my_app.py $CONFIG_FILE
-        - task:
-          name: my_parallelized_custom_config_task
-          type: python
-          description: my 4th ds task
-          artifact-id: mytask1artifactid
-          executors: 5
-          config-type: task
-          config-path: my_create_custom_config_task
-          cmd: python -u my_app.py
-        - task:
-          name: my_parallelized_no_config_task
-          type: python
-          description: my 4th ds task
-          artifact-id: mytask1artifactid
-          executors: 5
-          cmd: python -u my_app.py
-services:
-  - service:
-    name: myserver1
-    type: python-server
-    description: my python server
-    artifact-id: myserver1artifactid
-    source: myserver1logicfolder
-    endpoints:
-      - endpoint:
-        path: /myendpoint1
-        module: mymodule1
-        function: myfun1
-      - endpoint:
-        path: /myendpoint2
-        module: mymodule2
-        function: myfun2
diff --git a/tests/runners/airflow/compiler/test_rainbow_compiler.py b/tests/runners/airflow/compiler/test_rainbow_compiler.py
deleted file mode 100644
index 6e73d8f..0000000
--- a/tests/runners/airflow/compiler/test_rainbow_compiler.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-
-from rainbow.runners.airflow.compiler import rainbow_compiler
-
-
-class TestRainbowCompiler(unittest.TestCase):
-
-    def test_parse(self):
-        expected = {'name': 'MyPipeline', 'owner': 'Bosco Albert Baracus', 'pipeline': {'timeout-minutes': 45, 'schedule': '0 * 1 * *', 'metrics-namespace': 'TestNamespace', 'tasks': [{'name': 'mytask1', 'type': 'sql', 'description': 'mytask1 is cool', 'query': 'select * from mytable', 'overrides': [{'prod': None, 'partition-columns': 'dt', 'output-table': 'test.test_impression_prod', 'output-path': 's3://mybucket/myproject-test/impression', 'emr-cluster-name': 'spark-playground-prod'},  [...]
-        actual = rainbow_compiler.parse_yaml('tests/runners/airflow/compiler/rainbow.yml')
-        self.assertEqual(expected, actual)
-
-
-if __name__ == '__main__':
-    unittest.main()
diff --git a/rainbow/monitoring/__init__.py b/tests/runners/airflow/dag/__init__.py
similarity index 100%
copy from rainbow/monitoring/__init__.py
copy to tests/runners/airflow/dag/__init__.py
diff --git a/tests/runners/airflow/dag/rainbow/rainbow.yml b/tests/runners/airflow/dag/rainbow/rainbow.yml
new file mode 100644
index 0000000..07afd08
--- /dev/null
+++ b/tests/runners/airflow/dag/rainbow/rainbow.yml
@@ -0,0 +1,51 @@
+
+---
+name: MyPipeline
+owner: Bosco Albert Baracus
+pipelines:
+  - pipeline: my_pipeline
+    start_date: 1970-01-01
+    timeout-minutes: 45
+    schedule: 0 * 1 * *
+    metrics-namespace: TestNamespace
+    tasks:
+      - task: my_static_config_task
+        type: python
+        description: my 1st ds task
+        image: mytask1artifactid
+        source: mytask1folder
+        env_vars:
+          env1: "a"
+          env2: "b"
+        input_type: static
+        input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}"
+        output_path: 'baz'
+        cmd: 'foo bar'
+      - task: my_static_config_task2
+        type: python
+        description: my 1st ds task
+        image: mytask1artifactid
+        source: mytask1folder
+        env_vars:
+          env1: "a"
+          env2: "b"
+        input_type: static
+        input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}"
+        output_path: 'baz'
+        cmd: 'foo bar'
+services:
+  - service:
+    name: myserver1
+    type: python-server
+    description: my python server
+    artifact-id: myserver1artifactid
+    source: myserver1logicfolder
+    endpoints:
+      - endpoint:
+        path: /myendpoint1
+        module: mymodule1
+        function: myfun1
+      - endpoint:
+        path: /myendpoint2
+        module: mymodule2
+        function: myfun2
diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py
new file mode 100644
index 0000000..41bea09
--- /dev/null
+++ b/tests/runners/airflow/dag/test_rainbow_dags.py
@@ -0,0 +1,11 @@
+from unittest import TestCase
+
+from rainbow.runners.airflow.dag import rainbow_dags
+
+
+class Test(TestCase):
+    def test_register_dags(self):
+        dags = rainbow_dags.register_dags("tests/runners/airflow/dag/rainbow")
+        self.assertEqual(len(dags), 1)
+        # TODO: elaborate test
+        pass
diff --git a/rainbow/monitoring/__init__.py b/tests/runners/airflow/tasks/__init__.py
similarity index 100%
copy from rainbow/monitoring/__init__.py
copy to tests/runners/airflow/tasks/__init__.py
diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py
new file mode 100644
index 0000000..4f5808b
--- /dev/null
+++ b/tests/runners/airflow/tasks/test_python.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+
+from rainbow.runners.airflow.operators.kubernetes_pod_operator import \
+    ConfigurableKubernetesPodOperator
+from rainbow.runners.airflow.tasks import python
+from tests.util import dag_test_utils
+
+
+class TestPythonTask(TestCase):
+    def test_apply_task_to_dag(self):
+        # TODO: elaborate tests
+        dag = dag_test_utils.create_dag()
+
+        task_id = 'my_task'
+
+        config = {
+            'task': task_id,
+            'cmd': 'foo bar',
+            'image': 'my_image',
+            'input_type': 'my_input_type',
+            'input_path': 'my_input',
+            'output_path': '/my_output.json'
+        }
+
+        task0 = python.PythonTask(dag, 'my_pipeline', None, config, 'all_success')
+        task0.apply_task_to_dag()
+
+        self.assertEqual(len(dag.tasks), 1)
+        dag_task0 = dag.tasks[0]
+
+        self.assertIsInstance(dag_task0, ConfigurableKubernetesPodOperator)
+        self.assertEqual(dag_task0.task_id, task_id)
diff --git a/rainbow/monitoring/__init__.py b/tests/util/__init__.py
similarity index 100%
copy from rainbow/monitoring/__init__.py
copy to tests/util/__init__.py
diff --git a/rainbow/runners/airflow/compiler/rainbow_compiler.py b/tests/util/dag_test_utils.py
similarity index 76%
copy from rainbow/runners/airflow/compiler/rainbow_compiler.py
copy to tests/util/dag_test_utils.py
index 818fdc5..b1fbcab 100644
--- a/rainbow/runners/airflow/compiler/rainbow_compiler.py
+++ b/tests/util/dag_test_utils.py
@@ -15,12 +15,19 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Compiler for rainbows.
-"""
-import yaml
 
 
-def parse_yaml(path):
-    with open(path, 'r') as stream:
-        return yaml.safe_load(stream)
+from datetime import datetime
+
+from airflow import DAG
+
+
+def create_dag():
+    """
+    Test util to create a basic DAG for testing.
+    """
+
+    return DAG(
+        dag_id='test_dag',
+        default_args={'start_date': datetime(1970, 1, 1)}
+    )