You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:24:50 UTC

[incubator-liminal] 10/43: Refactor PythonTask

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git

commit 9292bb85b77084f31c48c1c982a5a7f593fefa66
Author: aviemzur <av...@gmail.com>
AuthorDate: Thu Mar 12 10:14:55 2020 +0200

    Refactor PythonTask
---
 rainbow/runners/airflow/tasks/python.py | 122 +++++++++++++++++---------------
 1 file changed, 65 insertions(+), 57 deletions(-)

diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py
index 8317854..eb00c0e 100644
--- a/rainbow/runners/airflow/tasks/python.py
+++ b/rainbow/runners/airflow/tasks/python.py
@@ -58,76 +58,84 @@ class PythonTask(task.Task):
 
     def apply_task_to_dag(self):
 
-        def create_pod_operator(task_id, task_split, image):
-            return ConfigurableKubernetesPodOperator(
-                task_id=task_id,
-                config_task_id=self.config_task_id,
-                task_split=task_split,
-                image=image,
-                cmds=self.cmds,
-                arguments=self.arguments,
-                **self.kubernetes_kwargs
-            )
-
         config_task = None
 
         if self.input_type in ['static', 'task']:
-            self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path})
-
-            config_task = ConfigureParallelExecutionOperator(
-                task_id=self.config_task_id,
-                image=self.image,
-                config_type=self.input_type,
-                config_path=self.input_path,
-                executors=self.executors,
-                **self.kubernetes_kwargs
-            )
+            config_task = self.__config_task(config_task)
 
         if self.executors == 1:
-            pod_task = create_pod_operator(
-                task_id=f'{self.task_name}',
-                task_split=0,
-                image=f'''{self.image}'''
-            )
-
-            first_task = pod_task
-
-            if config_task:
-                first_task = config_task
-                first_task.set_downstream(pod_task)
-
-            if self.parent:
-                self.parent.set_downstream(first_task)
-
-            return pod_task
+            return self.__apply_task_to_dag_single_executor(config_task)
         else:
-            if not config_task:
-                config_task = DummyOperator(
-                    task_id=self.config_task_id,
-                    trigger_rule=self.trigger_rule,
-                    dag=self.dag
-                )
+            return self.__apply_task_to_dag_multiple_executors(config_task)
 
-            end_task = DummyOperator(
-                task_id=self.task_name,
+    def __apply_task_to_dag_multiple_executors(self, config_task):
+        if not config_task:
+            config_task = DummyOperator(
+                task_id=self.config_task_id,
+                trigger_rule=self.trigger_rule,
                 dag=self.dag
             )
 
-            if self.parent:
-                self.parent.set_downstream(config_task)
-
-                for i in range(self.executors):
-                    split_task = create_pod_operator(
-                        task_id=f'''{self.task_name}_{i}''',
-                        task_split=i,
-                        image=self.image
-                    )
+        end_task = DummyOperator(
+            task_id=self.task_name,
+            dag=self.dag
+        )
 
-                    config_task.set_downstream(split_task)
+        if self.parent:
+            self.parent.set_downstream(config_task)
 
-                    split_task.set_downstream(end_task)
+            for i in range(self.executors):
+                split_task = self.__create_pod_operator(
+                    task_id=f'''{self.task_name}_{i}''',
+                    task_split=i,
+                    image=self.image
+                )
 
-            return end_task
+                config_task.set_downstream(split_task)
+
+                split_task.set_downstream(end_task)
+
+        return end_task
+
+    def __create_pod_operator(self, task_id, task_split, image):
+        return ConfigurableKubernetesPodOperator(
+            task_id=task_id,
+            config_task_id=self.config_task_id,
+            task_split=task_split,
+            image=image,
+            cmds=self.cmds,
+            arguments=self.arguments,
+            **self.kubernetes_kwargs
+        )
+
+    def __apply_task_to_dag_single_executor(self, config_task):
+        pod_task = self.__create_pod_operator(
+            task_id=f'{self.task_name}',
+            task_split=0,
+            image=f'''{self.image}'''
+        )
+
+        first_task = pod_task
+
+        if config_task:
+            first_task = config_task
+            first_task.set_downstream(pod_task)
+        if self.parent:
+            self.parent.set_downstream(first_task)
+
+        return pod_task
+
+    def __config_task(self, config_task):
+        self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path})
+        config_task = ConfigureParallelExecutionOperator(
+            task_id=self.config_task_id,
+            image=self.image,
+            config_type=self.input_type,
+            config_path=self.input_path,
+            executors=self.executors,
+            **self.kubernetes_kwargs
+        )
+        return config_task
 
     def __executors(self):
         executors = 1