You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:24:51 UTC
[incubator-liminal] 11/43: Refactor build
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 0817e973ec27f9a25a8cacc464b0a043d8d47428
Author: aviemzur <av...@gmail.com>
AuthorDate: Thu Mar 12 11:13:58 2020 +0200
Refactor build
---
rainbow/{runners/airflow => }/build/__init__.py | 0
rainbow/build/build_rainbow.py | 57 +++++++++++++++
.../airflow => }/build/python/container-setup.sh | 0
.../build/python/container-teardown.sh | 0
rainbow/core/__init__.py | 1 -
.../hello_world => rainbow/core/util}/__init__.py | 0
rainbow/core/{__init__.py => util/files_util.py} | 12 +++-
rainbow/docker/python/python_image.py | 61 +++++++++-------
rainbow/runners/airflow/build/build_rainbow.py | 84 ----------------------
rainbow/runners/airflow/dag/rainbow_dags.py | 23 +++---
rainbow/runners/airflow/model/task.py | 6 --
.../airflow/tasks/create_cloudformation_stack.py | 3 -
.../airflow/tasks/delete_cloudformation_stack.py | 3 -
rainbow/runners/airflow/tasks/job_end.py | 3 -
rainbow/runners/airflow/tasks/job_start.py | 3 -
rainbow/runners/airflow/tasks/python.py | 9 ---
rainbow/runners/airflow/tasks/spark.py | 3 -
rainbow/runners/airflow/tasks/sql.py | 3 -
requirements.txt | 2 +
.../{tasks/hello_world => build}/__init__.py | 0
.../hello_world => build/python}/__init__.py | 0
.../airflow/build/python/test_python_image.py | 26 ++++---
tests/runners/airflow/build/test_build_rainbow.py | 8 +++
tests/runners/airflow/dag/test_rainbow_dags.py | 2 +-
.../{tasks/hello_world => rainbow}/__init__.py | 0
.../{tasks => rainbow}/hello_world/__init__.py | 0
.../{tasks => rainbow}/hello_world/hello_world.py | 0
.../runners/airflow/{dag => }/rainbow/rainbow.yml | 24 +++++--
tests/runners/airflow/tasks/test_python.py | 18 +----
29 files changed, 161 insertions(+), 190 deletions(-)
diff --git a/rainbow/runners/airflow/build/__init__.py b/rainbow/build/__init__.py
similarity index 100%
rename from rainbow/runners/airflow/build/__init__.py
rename to rainbow/build/__init__.py
diff --git a/rainbow/build/build_rainbow.py b/rainbow/build/build_rainbow.py
new file mode 100644
index 0000000..280b862
--- /dev/null
+++ b/rainbow/build/build_rainbow.py
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+import yaml
+
+from rainbow.core.util import files_util
+from rainbow.docker.python.python_image import PythonImage
+
+
+def build_rainbow(path):
+ """
+ TODO: doc for build_rainbow
+ """
+
+ config_files = files_util.find_config_files(path)
+
+ for config_file in config_files:
+ print(f'Building artifacts file: f{config_file}')
+
+ with open(config_file) as stream:
+ # TODO: validate config
+ config = yaml.safe_load(stream)
+
+ for pipeline in config['pipelines']:
+ for task in pipeline['tasks']:
+ task_type = task['type']
+ task_instance = get_build_class(task_type)()
+ task_instance.build(base_path=os.path.dirname(config_file),
+ relative_source_path=task['source'],
+ tag=task['image'])
+
+
+# TODO: task class registry
+build_classes = {
+ 'python': PythonImage
+
+}
+
+
+def get_build_class(task_type):
+ return build_classes[task_type]
diff --git a/rainbow/runners/airflow/build/python/container-setup.sh b/rainbow/build/python/container-setup.sh
similarity index 100%
rename from rainbow/runners/airflow/build/python/container-setup.sh
rename to rainbow/build/python/container-setup.sh
diff --git a/rainbow/runners/airflow/build/python/container-teardown.sh b/rainbow/build/python/container-teardown.sh
similarity index 100%
rename from rainbow/runners/airflow/build/python/container-teardown.sh
rename to rainbow/build/python/container-teardown.sh
diff --git a/rainbow/core/__init__.py b/rainbow/core/__init__.py
index 2162b08..217e5db 100644
--- a/rainbow/core/__init__.py
+++ b/rainbow/core/__init__.py
@@ -15,4 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-# TODO: core
diff --git a/tests/runners/airflow/tasks/hello_world/__init__.py b/rainbow/core/util/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/hello_world/__init__.py
copy to rainbow/core/util/__init__.py
diff --git a/rainbow/core/__init__.py b/rainbow/core/util/files_util.py
similarity index 76%
copy from rainbow/core/__init__.py
copy to rainbow/core/util/files_util.py
index 2162b08..e5a8e09 100644
--- a/rainbow/core/__init__.py
+++ b/rainbow/core/util/files_util.py
@@ -15,4 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-# TODO: core
+
+import os
+
+
+def find_config_files(path):
+ files = []
+ for r, d, f in os.walk(path):
+ for file in f:
+ if file[file.rfind('.') + 1:] in ['yml', 'yaml']:
+ files.append(os.path.join(r, file))
+ return files
diff --git a/rainbow/docker/python/python_image.py b/rainbow/docker/python/python_image.py
index d66dfbe..2cd3594 100644
--- a/rainbow/docker/python/python_image.py
+++ b/rainbow/docker/python/python_image.py
@@ -18,44 +18,57 @@
import os
import shutil
import tempfile
+
import docker
-def build(source_path, tag, extra_files=None):
- if extra_files is None:
- extra_files = []
+class PythonImage:
+
+ def build(self, base_path, relative_source_path, tag, extra_files=None):
+ """
+ TODO: pydoc
- print(f'Building image {tag}')
+ :param base_path:
+ :param relative_source_path:
+ :param tag:
+ :param extra_files:
+ :return:
+ """
- temp_dir = tempfile.mkdtemp()
- # Delete dir for shutil.copytree to work
- os.rmdir(temp_dir)
+ if extra_files is None:
+ extra_files = []
- __copy_source(source_path, temp_dir)
+ print(f'Building image {tag}')
- requirements_file_path = os.path.join(temp_dir, 'requirements.txt')
- if not os.path.exists(requirements_file_path):
- with open(requirements_file_path, 'w'):
- pass
+ temp_dir = tempfile.mkdtemp()
+ # Delete dir for shutil.copytree to work
+ os.rmdir(temp_dir)
- dockerfile_path = os.path.join(os.path.dirname(__file__), 'Dockerfile')
+ self.__copy_source(os.path.join(base_path, relative_source_path), temp_dir)
- for file in extra_files + [dockerfile_path]:
- __copy_file(file, temp_dir)
+ requirements_file_path = os.path.join(temp_dir, 'requirements.txt')
+ if not os.path.exists(requirements_file_path):
+ with open(requirements_file_path, 'w'):
+ pass
- print(temp_dir, os.listdir(temp_dir))
+ dockerfile_path = os.path.join(os.path.dirname(__file__), 'Dockerfile')
- docker_client = docker.from_env()
- docker_client.images.build(path=temp_dir, tag=tag)
+ for file in extra_files + [dockerfile_path]:
+ self.__copy_file(file, temp_dir)
- docker_client.close()
+ print(temp_dir, os.listdir(temp_dir))
- shutil.rmtree(temp_dir)
+ docker_client = docker.from_env()
+ docker_client.images.build(path=temp_dir, tag=tag)
+ docker_client.close()
-def __copy_source(source_path, destination_path):
- shutil.copytree(source_path, destination_path)
+ shutil.rmtree(temp_dir)
+ @staticmethod
+ def __copy_source(source_path, destination_path):
+ shutil.copytree(source_path, destination_path)
-def __copy_file(source_file_path, destination_file_path):
- shutil.copy2(source_file_path, destination_file_path)
+ @staticmethod
+ def __copy_file(source_file_path, destination_file_path):
+ shutil.copy2(source_file_path, destination_file_path)
diff --git a/rainbow/runners/airflow/build/build_rainbow.py b/rainbow/runners/airflow/build/build_rainbow.py
deleted file mode 100644
index 222ea5f..0000000
--- a/rainbow/runners/airflow/build/build_rainbow.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import os
-import pprint
-from datetime import datetime
-
-import yaml
-from airflow import DAG
-
-from rainbow.runners.airflow.tasks.python import PythonTask
-
-
-def build_rainbow(path):
- """
- TODO: doc for build_rainbow
- """
- files = []
- for r, d, f in os.walk(path):
- for file in f:
- if file[file.rfind('.') + 1:] in ['yml', 'yaml']:
- files.append(os.path.join(r, file))
-
- print(files)
-
- dags = []
-
- for config_file in files:
- print(f'Building artifacts file: f{config_file}')
-
- with open(config_file) as stream:
- # TODO: validate config
- config = yaml.safe_load(stream)
- pp = pprint.PrettyPrinter(indent=4)
- # pp.pprint(config)
-
- for pipeline in config['pipelines']:
- parent = None
-
- default_args = {
- 'owner': config['owner'],
- 'start_date': datetime.combine(pipeline['start_date'], datetime.min.time())
- }
- # TODO: add all relevant airflow args
- dag = DAG(
- dag_id='test_dag',
- default_args=default_args
- )
-
- for task in pipeline['tasks']:
- task_type = task['type']
- task_instance = get_task_class(task_type)(
- dag, pipeline['pipeline'], parent if parent else None, task, 'all_success'
- )
- parent = task_instance.build()
-
-
-# TODO: task class registry
-task_classes = {
- 'python': PythonTask
-}
-
-
-def get_task_class(task_type):
- return task_classes[task_type]
-
-
-if __name__ == '__main__':
- register_dags('')
diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py
index c564737..639f0cc 100644
--- a/rainbow/runners/airflow/dag/rainbow_dags.py
+++ b/rainbow/runners/airflow/dag/rainbow_dags.py
@@ -16,38 +16,30 @@
# specific language governing permissions and limitations
# under the License.
-import os
-import pprint
from datetime import datetime
import yaml
from airflow import DAG
-from rainbow.runners.airflow.build import build_rainbow
+from rainbow.core.util import files_util
+from rainbow.runners.airflow.tasks.python import PythonTask
-def register_dags(path):
+def register_dags(configs_path):
"""
TODO: doc for register_dags
"""
- files = []
- for r, d, f in os.walk(path):
- for file in f:
- if file[file.rfind('.') + 1:] in ['yml', 'yaml']:
- files.append(os.path.join(r, file))
- print(files)
+ config_files = files_util.find_config_files(configs_path)
dags = []
- for config_file in files:
+ for config_file in config_files:
print(f'Registering DAG for file: f{config_file}')
with open(config_file) as stream:
# TODO: validate config
config = yaml.safe_load(stream)
- pp = pprint.PrettyPrinter(indent=4)
- # pp.pprint(config)
for pipeline in config['pipelines']:
parent = None
@@ -75,7 +67,10 @@ def register_dags(path):
return dags
-task_classes = build_rainbow.task_classes
+# TODO: task class registry
+task_classes = {
+ 'python': PythonTask
+}
def get_task_class(task_type):
diff --git a/rainbow/runners/airflow/model/task.py b/rainbow/runners/airflow/model/task.py
index 25656ee..8163117 100644
--- a/rainbow/runners/airflow/model/task.py
+++ b/rainbow/runners/airflow/model/task.py
@@ -32,12 +32,6 @@ class Task:
self.config = config
self.trigger_rule = trigger_rule
- def build(self):
- """
- Build task's artifacts.
- """
- raise NotImplementedError()
-
def apply_task_to_dag(self):
"""
Registers Airflow operator to parent task.
diff --git a/rainbow/runners/airflow/tasks/create_cloudformation_stack.py b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py
index c478dc7..ca8482a 100644
--- a/rainbow/runners/airflow/tasks/create_cloudformation_stack.py
+++ b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py
@@ -27,8 +27,5 @@ class CreateCloudFormationStackTask(task.Task):
def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
- def build(self):
- pass
-
def apply_task_to_dag(self):
pass
diff --git a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
index d172284..8ac4e8b 100644
--- a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
+++ b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
@@ -27,8 +27,5 @@ class DeleteCloudFormationStackTask(task.Task):
def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
- def build(self):
- pass
-
def apply_task_to_dag(self):
pass
diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/job_end.py
index a6c5ef2..53e1eef 100644
--- a/rainbow/runners/airflow/tasks/job_end.py
+++ b/rainbow/runners/airflow/tasks/job_end.py
@@ -27,8 +27,5 @@ class JobEndTask(task.Task):
def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
- def build(self):
- pass
-
def apply_task_to_dag(self):
pass
diff --git a/rainbow/runners/airflow/tasks/job_start.py b/rainbow/runners/airflow/tasks/job_start.py
index 7338363..5c82e1c 100644
--- a/rainbow/runners/airflow/tasks/job_start.py
+++ b/rainbow/runners/airflow/tasks/job_start.py
@@ -27,9 +27,6 @@ class JobStartTask(task.Task):
def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
- def build(self):
- pass
-
def apply_task_to_dag(self):
# TODO: job start task
pass
diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py
index eb00c0e..b2769c8 100644
--- a/rainbow/runners/airflow/tasks/python.py
+++ b/rainbow/runners/airflow/tasks/python.py
@@ -47,15 +47,6 @@ class PythonTask(task.Task):
self.config_task_id = self.task_name + '_input'
self.executors = self.__executors()
- def build(self):
- if 'source' in self.config:
- script_dir = os.path.dirname(__file__)
-
- python_image.build(self.config['source'], self.image, [
- os.path.join(script_dir, '../build/python/container-setup.sh'),
- os.path.join(script_dir, '../build/python/container-teardown.sh')
- ])
-
def apply_task_to_dag(self):
config_task = None
diff --git a/rainbow/runners/airflow/tasks/spark.py b/rainbow/runners/airflow/tasks/spark.py
index 8846f97..5822e92 100644
--- a/rainbow/runners/airflow/tasks/spark.py
+++ b/rainbow/runners/airflow/tasks/spark.py
@@ -27,8 +27,5 @@ class SparkTask(task.Task):
def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
- def build(self):
- pass
-
def apply_task_to_dag(self):
pass
diff --git a/rainbow/runners/airflow/tasks/sql.py b/rainbow/runners/airflow/tasks/sql.py
index 23458a9..42c02ce 100644
--- a/rainbow/runners/airflow/tasks/sql.py
+++ b/rainbow/runners/airflow/tasks/sql.py
@@ -27,8 +27,5 @@ class SparkTask(task.Task):
def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
super().__init__(dag, pipeline_name, parent, config, trigger_rule)
- def build(self):
- pass
-
def apply_task_to_dag(self):
pass
diff --git a/requirements.txt b/requirements.txt
index e952e4c..6e05d98 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,5 @@
+botocore
+PyYAML
docker==4.2.0
apache-airflow==1.10.9
docker-pycreds==0.4.0
diff --git a/tests/runners/airflow/tasks/hello_world/__init__.py b/tests/runners/airflow/build/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/hello_world/__init__.py
copy to tests/runners/airflow/build/__init__.py
diff --git a/tests/runners/airflow/tasks/hello_world/__init__.py b/tests/runners/airflow/build/python/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/hello_world/__init__.py
copy to tests/runners/airflow/build/python/__init__.py
diff --git a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py b/tests/runners/airflow/build/python/test_python_image.py
similarity index 58%
copy from rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
copy to tests/runners/airflow/build/python/test_python_image.py
index d172284..c290720 100644
--- a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
+++ b/tests/runners/airflow/build/python/test_python_image.py
@@ -16,19 +16,23 @@
# specific language governing permissions and limitations
# under the License.
-from rainbow.runners.airflow.model import task
+import docker
+from rainbow.docker.python import python_image
-class DeleteCloudFormationStackTask(task.Task):
- """
- # TODO: Deletes cloud_formation stack.
- """
- def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
- super().__init__(dag, pipeline_name, parent, config, trigger_rule)
+def test_build(self):
+ config = self.__create_conf('my_task')
- def build(self):
- pass
+ image_name = config['image']
- def apply_task_to_dag(self):
- pass
+ python_image.build('tests/runners/airflow/rainbow', 'hello_world', 'image_name')
+
+ # TODO: elaborate test of image, validate input/output
+
+ docker_client = docker.from_env()
+ docker_client.images.get(image_name)
+ container_log = docker_client.containers.run(image_name, "python hello_world.py")
+ docker_client.close()
+
+ self.assertEqual("b'Hello world!\\n'", str(container_log))
diff --git a/tests/runners/airflow/build/test_build_rainbow.py b/tests/runners/airflow/build/test_build_rainbow.py
new file mode 100644
index 0000000..c8fec6e
--- /dev/null
+++ b/tests/runners/airflow/build/test_build_rainbow.py
@@ -0,0 +1,8 @@
+from unittest import TestCase
+
+from rainbow.build import build_rainbow
+
+
+class Test(TestCase):
+ def test_build_rainbow(self):
+ build_rainbow.build_rainbow('tests/runners/airflow/rainbow')
diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py
index c66e3bc..2a65f31 100644
--- a/tests/runners/airflow/dag/test_rainbow_dags.py
+++ b/tests/runners/airflow/dag/test_rainbow_dags.py
@@ -6,7 +6,7 @@ import unittest
class Test(TestCase):
def test_register_dags(self):
- dags = rainbow_dags.register_dags("tests/runners/airflow/dag/rainbow")
+ dags = rainbow_dags.register_dags('tests/runners/airflow/rainbow')
self.assertEqual(len(dags), 1)
# TODO: elaborate test
pass
diff --git a/tests/runners/airflow/tasks/hello_world/__init__.py b/tests/runners/airflow/rainbow/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/hello_world/__init__.py
copy to tests/runners/airflow/rainbow/__init__.py
diff --git a/tests/runners/airflow/tasks/hello_world/__init__.py b/tests/runners/airflow/rainbow/hello_world/__init__.py
similarity index 100%
rename from tests/runners/airflow/tasks/hello_world/__init__.py
rename to tests/runners/airflow/rainbow/hello_world/__init__.py
diff --git a/tests/runners/airflow/tasks/hello_world/hello_world.py b/tests/runners/airflow/rainbow/hello_world/hello_world.py
similarity index 100%
rename from tests/runners/airflow/tasks/hello_world/hello_world.py
rename to tests/runners/airflow/rainbow/hello_world/hello_world.py
diff --git a/tests/runners/airflow/dag/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml
similarity index 59%
rename from tests/runners/airflow/dag/rainbow/rainbow.yml
rename to tests/runners/airflow/rainbow/rainbow.yml
index 07afd08..fd30028 100644
--- a/tests/runners/airflow/dag/rainbow/rainbow.yml
+++ b/tests/runners/airflow/rainbow/rainbow.yml
@@ -1,4 +1,20 @@
-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
---
name: MyPipeline
owner: Bosco Albert Baracus
@@ -12,8 +28,8 @@ pipelines:
- task: my_static_config_task
type: python
description: my 1st ds task
- image: mytask1artifactid
- source: mytask1folder
+ image: my_image
+ source: hello_world
env_vars:
env1: "a"
env2: "b"
@@ -25,7 +41,7 @@ pipelines:
type: python
description: my 1st ds task
image: mytask1artifactid
- source: mytask1folder
+ source: hello_world
env_vars:
env1: "a"
env2: "b"
diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py
index 4bbbe9c..37a325a 100644
--- a/tests/runners/airflow/tasks/test_python.py
+++ b/tests/runners/airflow/tasks/test_python.py
@@ -46,29 +46,13 @@ class TestPythonTask(TestCase):
self.assertIsInstance(dag_task0, ConfigurableKubernetesPodOperator)
self.assertEqual(dag_task0.task_id, task_id)
- def test_build(self):
- config = self.__create_conf('my_task')
-
- task0 = python.PythonTask(None, None, None, config, None)
- task0.build()
-
- # TODO: elaborate test of image, validate input/output
- image_name = config['image']
-
- docker_client = docker.from_env()
- docker_client.images.get(image_name)
- container_log = docker_client.containers.run(image_name, "python hello_world.py")
- docker_client.close()
-
- self.assertEqual("b'Hello world!\\n'", str(container_log))
-
@staticmethod
def __create_conf(task_id):
return {
'task': task_id,
'cmd': 'foo bar',
'image': 'my_image',
- 'source': 'tests/runners/airflow/tasks/hello_world',
+ 'source': 'tests/runners/airflow/rainbow/hello_world',
'input_type': 'my_input_type',
'input_path': 'my_input',
'output_path': '/my_output.json'