You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:24:50 UTC
[incubator-liminal] 10/43: Refactor PythonTask
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 9292bb85b77084f31c48c1c982a5a7f593fefa66
Author: aviemzur <av...@gmail.com>
AuthorDate: Thu Mar 12 10:14:55 2020 +0200
Refactor PythonTask
---
rainbow/runners/airflow/tasks/python.py | 122 +++++++++++++++++---------------
1 file changed, 65 insertions(+), 57 deletions(-)
diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py
index 8317854..eb00c0e 100644
--- a/rainbow/runners/airflow/tasks/python.py
+++ b/rainbow/runners/airflow/tasks/python.py
@@ -58,76 +58,84 @@ class PythonTask(task.Task):
def apply_task_to_dag(self):
- def create_pod_operator(task_id, task_split, image):
- return ConfigurableKubernetesPodOperator(
- task_id=task_id,
- config_task_id=self.config_task_id,
- task_split=task_split,
- image=image,
- cmds=self.cmds,
- arguments=self.arguments,
- **self.kubernetes_kwargs
- )
-
config_task = None
if self.input_type in ['static', 'task']:
- self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path})
-
- config_task = ConfigureParallelExecutionOperator(
- task_id=self.config_task_id,
- image=self.image,
- config_type=self.input_type,
- config_path=self.input_path,
- executors=self.executors,
- **self.kubernetes_kwargs
- )
+ config_task = self.__config_task(config_task)
if self.executors == 1:
- pod_task = create_pod_operator(
- task_id=f'{self.task_name}',
- task_split=0,
- image=f'''{self.image}'''
- )
-
- first_task = pod_task
-
- if config_task:
- first_task = config_task
- first_task.set_downstream(pod_task)
-
- if self.parent:
- self.parent.set_downstream(first_task)
-
- return pod_task
+ return self.__apply_task_to_dag_single_executor(config_task)
else:
- if not config_task:
- config_task = DummyOperator(
- task_id=self.config_task_id,
- trigger_rule=self.trigger_rule,
- dag=self.dag
- )
+ return self.__apply_task_to_dag_multiple_executors(config_task)
- end_task = DummyOperator(
- task_id=self.task_name,
+ def __apply_task_to_dag_multiple_executors(self, config_task):
+ if not config_task:
+ config_task = DummyOperator(
+ task_id=self.config_task_id,
+ trigger_rule=self.trigger_rule,
dag=self.dag
)
- if self.parent:
- self.parent.set_downstream(config_task)
-
- for i in range(self.executors):
- split_task = create_pod_operator(
- task_id=f'''{self.task_name}_{i}''',
- task_split=i,
- image=self.image
- )
+ end_task = DummyOperator(
+ task_id=self.task_name,
+ dag=self.dag
+ )
- config_task.set_downstream(split_task)
+ if self.parent:
+ self.parent.set_downstream(config_task)
- split_task.set_downstream(end_task)
+ for i in range(self.executors):
+ split_task = self.__create_pod_operator(
+ task_id=f'''{self.task_name}_{i}''',
+ task_split=i,
+ image=self.image
+ )
- return end_task
+ config_task.set_downstream(split_task)
+
+ split_task.set_downstream(end_task)
+
+ return end_task
+
+ def __create_pod_operator(self, task_id, task_split, image):
+ return ConfigurableKubernetesPodOperator(
+ task_id=task_id,
+ config_task_id=self.config_task_id,
+ task_split=task_split,
+ image=image,
+ cmds=self.cmds,
+ arguments=self.arguments,
+ **self.kubernetes_kwargs
+ )
+
+ def __apply_task_to_dag_single_executor(self, config_task):
+ pod_task = self.__create_pod_operator(
+ task_id=f'{self.task_name}',
+ task_split=0,
+ image=f'''{self.image}'''
+ )
+
+ first_task = pod_task
+
+ if config_task:
+ first_task = config_task
+ first_task.set_downstream(pod_task)
+ if self.parent:
+ self.parent.set_downstream(first_task)
+
+ return pod_task
+
+ def __config_task(self, config_task):
+ self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path})
+ config_task = ConfigureParallelExecutionOperator(
+ task_id=self.config_task_id,
+ image=self.image,
+ config_type=self.input_type,
+ config_path=self.input_path,
+ executors=self.executors,
+ **self.kubernetes_kwargs
+ )
+ return config_task
def __executors(self):
executors = 1